Spaces:
Runtime error
Runtime error
| project_index = 0 # Initialize project_index with a default value | |
| import gradio as gr,os,shutil,numpy as np,hashlib,subprocess,pandas as pd | |
| from PIL import Image | |
| from datetime import datetime | |
| import base64 | |
| import requests | |
| import io | |
| import json | |
| import logging | |
| import re | |
| """ | |
| Gradio App Interface Specification: | |
| Inputs: | |
| - prompt: Text Input. Description: 'Enter a prompt for image generation.' | |
| - steps: Slider. Range: [1, 32]. value: 16. Description: 'Number of steps for image generation.' | |
| - model: Dropdown. Options: ['TurboAnime.saftensors', 'OtherModel']. value: 'TurboAnime.saftensors'. Description: 'Select the model for image generation.' | |
| - styles: CheckboxGroup. Options: ['RayORender', 'OtherStyle']. value: ['RayORender']. Description: 'Select styles for image generation.' | |
| Outputs: | |
| - image: Image. Description: 'Generated image based on the prompt.' | |
| - log: Text. Description: 'Log of the image generation process.' | |
| """ | |
| import gradio as gr | |
| import subprocess | |
| import json | |
| import base64 | |
| from PIL import Image | |
| import io | |
| import time | |
| # π Start time for the entire script | |
| start_time = time.time() | |
| def analyze_all_images(custom_prompt): | |
| global file_array | |
| descriptions = [] | |
| for file_info in file_array: | |
| file_path = find_file_by_hash(file_info["hash"]) | |
| if file_path: | |
| with Image.open(file_path) as img: | |
| description = get_image_description(img, custom_prompt) | |
| save_text(file_info["hash"], description, "", "", True) # Save the description to the text file | |
| descriptions.append(description) | |
| return descriptions | |
| def generate_status_message(index, total, file_name): | |
| return f"Processing {index + 1}/{total}: {file_name}" | |
| def analyze_thumbs_up_images(custom_prompt): | |
| thumbs_up_images = load_thumbs_up_gallery() # Assuming this function returns full paths | |
| total_images = len(thumbs_up_images) | |
| descriptions = [] | |
| status_messages = [] | |
| for index, image_path in enumerate(thumbs_up_images): | |
| with Image.open(image_path) as img: | |
| description = get_image_description(img, custom_prompt) | |
| descriptions.append((os.path.basename(image_path), description)) | |
| status_messages.append(generate_status_message(index, total_images, os.path.basename(image_path))) | |
| save_text(file_info["hash"], description, "", "", True) # Save the description to the text file | |
| return descriptions, status_messages | |
| # π’ Log Status | |
| def log_status(progress, message): | |
| elapsed_time = time.time() - start_time | |
| log_message = {"status": message, "progress": f"{progress}%", "time_elapsed": f"{elapsed_time:.2f} seconds"} | |
| return json.dumps(log_message) | |
| def process_files(file_info): | |
| try: | |
| if file_info: | |
| return [_process_file(file_path) for file_path in file_info] if isinstance(file_info, list) else [_process_file(file_info)] | |
| return [] | |
| except Exception as e: | |
| print(f"Error processing files: {e}") | |
| raise e | |
| def clear_uploads_folder(): | |
| os.makedirs(recycle_bin_folder, exist_ok=True) | |
| for file_name in os.listdir(_get_project_folder()): | |
| dest_file_path = os.path.join(recycle_bin_folder, file_name) | |
| if os.path.exists(dest_file_path): | |
| base, extension = os.path.splitext(file_name) | |
| i = 1 | |
| new_file_name = f"{base}_{i}{extension}" | |
| new_dest_file_path = os.path.join(recycle_bin_folder, new_file_name) | |
| while os.path.exists(new_dest_file_path): | |
| i += 1 | |
| new_file_name = f"{base}_{i}{extension}" | |
| new_dest_file_path = os.path.join(recycle_bin_folder, new_file_name) | |
| dest_file_path = new_dest_file_path | |
| shutil.move(os.path.join(_get_project_folder(), file_name), dest_file_path) | |
| return "Uploads folder cleared!" | |
| def undo_last_deletion(): | |
| file_list = os.listdir(recycle_bin_folder) | |
| if file_list: | |
| last_deleted_file = file_list[-1] | |
| shutil.move(os.path.join(recycle_bin_folder, last_deleted_file), _get_project_folder()) | |
| return f"Restored: {last_deleted_file}" | |
| return "No files to restore" | |
| def next_session(): | |
| global project_index | |
| project_index += 1 | |
| _init_project_directory() | |
| return f"Switched to the next session! Current session index: {project_index}" | |
| def previous_session(): | |
| global project_index | |
| if project_index > 0: project_index -= 1 | |
| return f"Switched to the previous session! Current session index: {project_index}" | |
| def get_next_image(): | |
| global current_image | |
| images = get_images() | |
| if images: | |
| current_image = images[0] if current_image is None else images[(images.index(current_image) + 1) % len(images)] | |
| else: | |
| current_image = default_image | |
| return current_image | |
| def get_previous_image(): | |
| global current_image | |
| images = get_images() | |
| if images: | |
| current_image = images[-1] if current_image is None else images[(images.index(current_image) - 1) % len(images)] | |
| else: | |
| current_image = default_image | |
| return current_image | |
| def process_all_zips(): | |
| for file_name in os.listdir(_get_project_folder()): | |
| file_path = os.path.join(_get_project_folder(), file_name) | |
| if zipfile.is_zipfile(file_path): _process_file(file_path) | |
| return "All zip files processed!" | |
| # π API Call | |
| def make_api_call(url, payload): | |
| try: | |
| response = subprocess.run(["curl", "-X", "POST", url, "-H", "Content-Type: application/json", "--data", json.dumps(payload)], capture_output=True, text=True) | |
| if response.returncode == 0: | |
| return json.loads(response.stdout), None | |
| else: | |
| return None, log_status(progress, f"API call failed with return code: {response.returncode}") | |
| except Exception as e: | |
| return None, log_status(progress, f"API call failed with exception: {str(e)}") | |
| # πΌοΈ Save Image | |
| def save_image(image_data): | |
| image = Image.open(io.BytesIO(base64.b64decode(image_data))) | |
| return image | |
| # π Main Execution Function | |
| def generate_image(prompt, steps, model, styles): | |
| base_url = "http://73.255.78.150:7909" | |
| base_payload = { | |
| "prompt": prompt, | |
| "steps": steps, | |
| "model": model, | |
| "styles": styles, | |
| "negative_prompt": "album, duplicate, crowded, multiple, stuff, messy, photo, collage, doll, caricature, render. mannequin.", | |
| "seed": -1, | |
| "height": 768, | |
| "width": 1280, | |
| "sampler_index": "DPM++ 2M SDE Karras", | |
| "restore_faces": False, | |
| "tiling": False, | |
| "n_iter": 1, | |
| "batch_size": 1, | |
| "cfg_scale": 2.0, | |
| "subseed": -1, | |
| "subseed_strength": 0.0, | |
| "seed_resize_from_h": -1, | |
| "seed_resize_from_w": -1, | |
| "seed_enable_extras": False, | |
| "enable_hr": False, | |
| "denoising_strength": 0.0, | |
| "hr_scale": 2.0, | |
| "hr_upscaler": "Latent", | |
| "hr_second_pass_steps": 0, | |
| "hr_resize_x": 0, | |
| "hr_resize_y": 0, | |
| "hr_sampler_index": "", | |
| "hr_prompt": "", | |
| "hr_negative_prompt": "", | |
| "override_settings_texts": "" | |
| } | |
| progress = 0 | |
| log = log_status(progress, "Starting image generation...") | |
| response, error_log = make_api_call(f"{base_url}/sdapi/v1/txt2img", base_payload) | |
| if error_log: | |
| return None, error_log | |
| if response and 'images' in response: | |
| image_data = response['images'][0] | |
| image = save_image(image_data) | |
| log = log_status(100, "Image generated successfully.") | |
| return image, log | |
| else: | |
| log = log_status(progress, "Failed to generate image.") | |
| return None, log | |
| max_image_size = 768 | |
| image_folder = "/Users/gev7418/Library/CloudStorage/OneDrive-Personal/Gradio Script Building Blocks/HatchDuo-Gradio/Images" | |
| thumbs_up_folder = os.path.join(image_folder, "Thumbs_Up") | |
| thumbs_down_folder = os.path.join(image_folder, "Thumbs_Down") | |
| allowed_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp'] | |
| folders_cycle = [image_folder, thumbs_up_folder, thumbs_down_folder] | |
| # Set up logging to print to the terminal | |
| logging.basicConfig(level=logging.INFO) | |
| # OpenAI API Key | |
| api_key = "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" # Replace with your OpenAI API Key | |
| import subprocess | |
| def convert_and_rename_files_in_directory(directory, original_extension, new_extension, new_base_name): | |
| """ | |
| Converts all files in the specified directory from the original extension to PNG, and renames them to a new base name followed by a number. | |
| """ | |
| files = [f for f in os.listdir(directory) if f.endswith(original_extension)] | |
| for index, file in enumerate(sorted(files)): | |
| new_name = f"{new_base_name}_{index}{new_extension}" | |
| original_file_path = os.path.join(directory, file) | |
| new_file_path = os.path.join(directory, new_name) | |
| # Convert to PNG using a subprocess call to a command line tool like ImageMagick | |
| subprocess.run(['convert', original_file_path, new_file_path]) | |
| os.rename(new_file_path, os.path.join(directory, new_name)) | |
| logging.info(f"All {original_extension} files in {directory} have been converted to PNG and renamed to {new_base_name} format.") | |
| def encode_image(image): | |
| """ | |
| This function converts the image to bytes and returns the base64 encoding of the image file | |
| """ | |
| logging.info("Encoding image to base64") | |
| image_bytes = io.BytesIO() | |
| if image.mode == 'RGBA': | |
| # Convert RGBA to RGB | |
| image = image.convert('RGB') | |
| image.save(image_bytes, format='JPEG') | |
| image_bytes = image_bytes.getvalue() | |
| base64_image = base64.b64encode(image_bytes).decode('utf-8') | |
| return base64_image | |
| def get_image_description(image, custom_prompt): | |
| """ | |
| This function sends the image to the OpenAI API and returns the response. | |
| It now checks if there's custom prompt content before sending it to the API. | |
| """ | |
| logging.info("Getting image description from OpenAI API") | |
| base64_image = encode_image(image) | |
| headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"} | |
| # Check if custom_prompt is not empty, otherwise use a default prompt | |
| if not custom_prompt.strip(): | |
| custom_prompt = "Describe this image" | |
| payload = { | |
| "model": "gpt-4-vision-preview", | |
| "messages": [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": custom_prompt}, | |
| {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}} | |
| ] | |
| } | |
| ], | |
| "max_tokens": 300 | |
| } | |
| response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload) | |
| response_payload = json.loads(response.text) | |
| content = response_payload['choices'][0]['message']['content'] | |
| logging.info("Received response from OpenAI API") | |
| return content | |
| def create_directories(): | |
| print("Creating directories...") | |
| os.makedirs(thumbs_up_folder, exist_ok=True) | |
| os.makedirs(thumbs_down_folder, exist_ok=True) | |
| create_directories() | |
| def get_files(folder): | |
| print(f"Getting files from {folder}...") | |
| return [os.path.join(dp, f) for dp, dn, filenames in os.walk(folder) for f in filenames if os.path.splitext(f)[1].lower() in allowed_extensions] | |
| def hash_image_pixels(file_path): | |
| """ | |
| Generates a hash for an image based on its pixel content. | |
| """ | |
| if os.path.splitext(file_path)[1].lower() in allowed_extensions: | |
| with Image.open(file_path) as img: | |
| # Convert the image to RGBA (to standardize if images are in different modes) | |
| img_rgba = img.convert('RGBA') | |
| # Calculate the new height and width to maintain aspect ratio | |
| aspect_ratio = img_rgba.width / img_rgba.height | |
| new_height = min(max_image_size, img_rgba.height) | |
| new_width = int(aspect_ratio * new_height) | |
| if img_rgba.height > max_image_size: | |
| new_height = max_image_size | |
| new_width = int(new_height * aspect_ratio) | |
| # Use Image.Resampling.LANCZOS for better quality resizing | |
| img_resized = img_rgba.resize((new_width, new_height), Image.Resampling.LANCZOS) | |
| # Get the bytes of the resized image data | |
| img_bytes = io.BytesIO() | |
| img_resized.save(img_bytes, format='PNG') # PNG format to ensure consistency across platforms | |
| img_bytes = img_bytes.getvalue() | |
| # Generate a hash of the resized image bytes | |
| hash_obj = hashlib.sha256(img_bytes) | |
| return hash_obj.hexdigest() | |
| else: | |
| return None | |
| # Global cache for hashes to file paths | |
| hash_to_path_cache = {} | |
| def update_hash_to_path_cache(): | |
| global hash_to_path_cache | |
| hash_to_path_cache.clear() | |
| for folder in folders_cycle: | |
| for file_path in get_files(folder): | |
| file_hash = hash_image_pixels(file_path) | |
| hash_to_path_cache[file_hash] = file_path | |
| def find_file_by_hash(file_hash): | |
| # Use the cache to find the file path | |
| file_path = hash_to_path_cache.get(file_hash, None) | |
| if file_path and os.path.exists(file_path): | |
| return file_path | |
| # If the file was not found or doesn't exist at the cached location, | |
| # search in the thumbs up and thumbs down folders | |
| for folder in [thumbs_up_folder, thumbs_down_folder]: | |
| for dp, dn, filenames in os.walk(folder): | |
| for f in filenames: | |
| potential_path = os.path.join(dp, f) | |
| if os.path.splitext(f)[1].lower() in allowed_extensions: | |
| if hash_image_pixels(potential_path) == file_hash: | |
| # Update the cache with the new location | |
| hash_to_path_cache[file_hash] = potential_path | |
| return potential_path | |
| # If the file is not found in any of the locations, return None | |
| return None | |
| def build_file_array(): | |
| print("Building file array...") | |
| files = [] | |
| for folder in folders_cycle: | |
| for file_path in get_files(folder): | |
| file_hash = hash_image_pixels(file_path) | |
| status = "π Rated Up" if folder == thumbs_up_folder else "π Rated Down" if folder == thumbs_down_folder else "β³ Pending" | |
| files.append({"path": file_path, "hash": file_hash, "status": status, "date_modified": os.path.getmtime(file_path)}) | |
| # Create text files for any images missing their text file counterpart | |
| text_file_path = file_path.replace(os.path.splitext(file_path)[1], '.txt') | |
| if not os.path.exists(text_file_path): | |
| with open(text_file_path, 'w') as text_file: | |
| text_file.write("") # Create an empty text file | |
| files = sorted(files, key=lambda x: x["date_modified"], reverse=True) | |
| return files | |
| def refresh_file_array_and_hashes(): | |
| global file_array, hash_list | |
| file_array = build_file_array() | |
| hash_list = [file['hash'] for file in file_array] | |
| print("File array and hash list updated.") | |
| update_hash_to_path_cache() | |
| refresh_file_array_and_hashes() | |
| current_index = 0 | |
| def save_text(file_hash, text, prepend_text="", append_text="", save_and_overwrite_changes=True): | |
| if not save_and_overwrite_changes: | |
| print("Skipping saving due to user preference.") | |
| return | |
| file_path = find_file_by_hash(file_hash) | |
| if file_path: | |
| text_file_path = file_path.replace(os.path.splitext(file_path)[1], '.txt') | |
| final_text = "" | |
| if not save_and_overwrite_changes and os.path.exists(text_file_path): | |
| with open(text_file_path, 'r') as existing_file: | |
| existing_content = existing_file.read() | |
| final_text = (prepend_text + ", " if prepend_text else "") + existing_content + (", " + append_text if append_text else "") | |
| else: | |
| final_text = (prepend_text + ", " if prepend_text else "") + text + (", " + append_text if append_text else "") | |
| print(f"Saving text for file hash {file_hash} to {text_file_path}...") | |
| with open(text_file_path, 'w') as text_file: | |
| text_file.write(final_text) | |
| def get_text(file_hash): | |
| file_path = find_file_by_hash(file_hash) | |
| if file_path: | |
| text_file_path = file_path.replace(os.path.splitext(file_path)[1], '.txt') | |
| if os.path.exists(text_file_path): | |
| print(f"Getting text for file hash {file_hash} from {text_file_path}...") | |
| with open(text_file_path) as text_file: | |
| return text_file.read() | |
| return "" | |
| def update_index_for_navigation(current_text, prepend_text, append_text, navigate_forward=True, save_and_overwrite_changes=False): | |
| global current_index, file_array, hash_list | |
| if current_text or not save_and_overwrite_changes: # Save only if there's something to save or if saving changes is not required | |
| file_hash = hash_list[current_index] | |
| save_text(file_hash, current_text, prepend_text, append_text, save_and_overwrite_changes) | |
| if navigate_forward: | |
| current_index = (current_index + 1) % len(hash_list) # Cycle to the first item if at the end | |
| else: | |
| current_index = (current_index - 1) % len(hash_list) # Cycle to the last item if at the beginning | |
| def get_file(navigate_forward, current_text, prepend_text, append_text, pause_api_call, save_and_overwrite_changes, custom_prompt): | |
| global current_index, file_array, hash_list | |
| print(f"Getting file, navigate_forward: {navigate_forward}...") | |
| update_index_for_navigation(current_text, prepend_text, append_text, navigate_forward, save_and_overwrite_changes) | |
| if current_index < len(hash_list): | |
| current_hash = hash_list[current_index] | |
| file_info = next((item for item in file_array if item["hash"] == current_hash), None) | |
| if file_info: | |
| file_path = find_file_by_hash(current_hash) | |
| if file_path: | |
| with Image.open(file_path) as img: | |
| # Calculate the new height and width to maintain aspect ratio, with a max height of 768 for display in gradio | |
| aspect_ratio = img.width / img.height | |
| new_height = min(768, img.height) # Set max height to 768 for gradio display | |
| new_width = int(aspect_ratio * new_height) | |
| # Use Image.Resampling.LANCZOS for better quality resizing | |
| img_resized = img.resize((new_width, new_height), Image.Resampling.LANCZOS) | |
| img_resized_bytes = io.BytesIO() | |
| img_resized.save(img_resized_bytes, format='PNG') | |
| img_resized_bytes = img_resized_bytes.getvalue() | |
| text = get_text(current_hash) # Load text file contents into textbox | |
| if not pause_api_call: | |
| text = get_image_description(img, custom_prompt) # Get new description from OpenAI | |
| save_text(current_hash, text, prepend_text, append_text, save_and_overwrite_changes) # Optionally save the new description | |
| # Convert the resized image to a numpy array for display | |
| img_resized_np = np.array(Image.open(io.BytesIO(img_resized_bytes))) | |
| return img_resized_np, current_hash, file_info["status"], text, os.path.basename(file_path), file_path, os.path.relpath(file_path, start=image_folder), os.path.basename(file_path).replace(os.path.splitext(os.path.basename(file_path))[1], '.txt'), find_file_by_hash(current_hash).replace(os.path.splitext(find_file_by_hash(current_hash))[1], '.txt'), os.path.relpath(find_file_by_hash(current_hash).replace(os.path.splitext(find_file_by_hash(current_hash))[1], '.txt'), start=image_folder) | |
| return None, "File not found", "β³ Pending", "", "", "", "", "", "", "" | |
| def move_file(direction, current_text, prepend_text, append_text, pause_api_call, save_and_overwrite_changes, custom_prompt): | |
| global current_index, file_array, hash_list | |
| print(f"Moving file in direction {direction}...") | |
| if current_index < len(hash_list): | |
| current_hash = hash_list[current_index] | |
| file_info = next((item for item in file_array if item["hash"] == current_hash), None) | |
| if file_info: | |
| source_file = find_file_by_hash(current_hash) | |
| if source_file: | |
| destination = thumbs_up_folder if direction == "up" else thumbs_down_folder if direction == "down" else None | |
| new_status = "π Rated Up" if direction == "up" else "π Rated Down" if direction == "down" else None | |
| if destination: | |
| shutil.move(source_file, os.path.join(destination, os.path.basename(source_file))) | |
| file_info["status"] = new_status | |
| # Get new description and save it | |
| with Image.open(source_file) as img: | |
| description = get_image_description(img, custom_prompt) | |
| save_text(current_hash, description, prepend_text, append_text, save_and_overwrite_changes) | |
| return get_file(True, current_text, prepend_text, append_text, pause_api_call, save_and_overwrite_changes, custom_prompt) | |
| return None, "Invalid direction or file not found", "β³ Pending", "", "", "", "", "", "", "" | |
| def reset_files(current_text, prepend_text, append_text, pause_api_call, save_and_overwrite_changes, custom_prompt): | |
| global file_array, current_index, hash_list | |
| print("Resetting files and clearing text...") | |
| for file_info in file_array: | |
| source_file = find_file_by_hash(file_info["hash"]) | |
| if source_file and os.path.exists(source_file): | |
| source_text = source_file.replace(os.path.splitext(source_file)[1], '.txt') | |
| if os.path.exists(source_text): | |
| shutil.move(source_text, os.path.join(image_folder, os.path.basename(source_text))) | |
| shutil.move(source_file, os.path.join(image_folder, os.path.basename(source_file))) | |
| with open(os.path.join(image_folder, os.path.basename(source_text)), 'w') as text_file: | |
| text_file.write("") | |
| file_info["status"] = "β³ Pending" | |
| file_array = build_file_array() | |
| hash_list = [file['hash'] for file in file_array] | |
| update_hash_to_path_cache() | |
| return get_file(True, current_text, prepend_text, append_text, pause_api_call, save_and_overwrite_changes, custom_prompt) | |
| def delete_file(current_text, prepend_text, append_text, pause_api_call, save_and_overwrite_changes, custom_prompt): | |
| global current_index, file_array, hash_list | |
| print("Deleting file...") | |
| if current_index < len(hash_list): | |
| current_hash = hash_list[current_index] | |
| source_file = find_file_by_hash(current_hash) | |
| if source_file: | |
| source_text = source_file.replace(os.path.splitext(source_file)[1], '.txt') | |
| os.remove(source_file) | |
| os.remove(source_text) | |
| hash_list.remove(current_hash) # Remove the hash from the hash_list | |
| file_array = [file for file in file_array if file["hash"] != current_hash] # Rebuild file_array without the deleted file | |
| if current_index >= len(hash_list): current_index = len(hash_list) - 1 | |
| update_hash_to_path_cache() | |
| return get_file(True, current_text, prepend_text, append_text, pause_api_call, save_and_overwrite_changes, custom_prompt) | |
| def load_image_details(): | |
| print("Loading image details...") | |
| image_details = [] | |
| for folder in folders_cycle: | |
| for file_path in get_files(folder): | |
| file_hash = hash_image_pixels(file_path) | |
| status = "π Rated Up" if folder == thumbs_up_folder else "π Rated Down" if folder == thumbs_down_folder else "β³ Pending" | |
| text_file_path = file_path.replace(os.path.splitext(file_path)[1], '.txt') | |
| text_content = "" | |
| if os.path.exists(text_file_path): | |
| with open(text_file_path, 'r') as text_file: | |
| text_content = text_file.read() | |
| image_details.append({ | |
| "Text Content": text_content, | |
| "Rating Status": status, | |
| "File Hash": file_hash, | |
| "Image Path": file_path, | |
| "Text Path": text_file_path if os.path.exists(text_file_path) else "N/A" | |
| }) | |
| return pd.DataFrame(image_details) | |
| def load_text_files_as_df(): | |
| print("Loading text files into dataframe...") | |
| text_files = [] | |
| for file in os.listdir(image_folder): | |
| if file.endswith('.txt'): | |
| file_path = os.path.join(image_folder, file) | |
| with open(file_path, 'r') as text_file: | |
| text_content = text_file.read() | |
| text_files.append({ | |
| "File Name": file.replace('.txt', ''), | |
| "Content": text_content, | |
| "Date Modified": datetime.fromtimestamp(os.path.getmtime(file_path)).strftime('%Y-%m-%d %H:%M:%S') | |
| }) | |
| return pd.DataFrame(text_files) | |
| # Function to merge text_files_df and image_details_df into a single dataframe | |
| def load_combined_details(): | |
| print("Loading text file details...") | |
| text_df = load_text_files_as_df() | |
| image_details_df = load_image_details() | |
| # Ensure the 'File Name' column exists in image_details_df by extracting it from 'Image Path' | |
| image_details_df['File Name'] = image_details_df['Image Path'].apply(lambda x: os.path.basename(x).replace(os.path.splitext(os.path.basename(x))[1], '')) | |
| combined_df = pd.merge(text_df, image_details_df, on="File Name", how="outer") | |
| # Reorder dataframe fields according to specified order: Date Modified, Rating Status, File Name, Text Content, File Hash, Text Path, Image Path | |
| combined_df = combined_df[['Date Modified', 'Rating Status', 'File Name', 'Text Content', 'File Hash', 'Text Path', 'Image Path']] | |
| return combined_df | |
| def load_gallery(): | |
| print("Loading gallery...") | |
| return sorted([os.path.join(image_folder, f) for f in os.listdir(image_folder) if os.path.splitext(f)[1].lower() in allowed_extensions], key=lambda x: os.path.basename(x).lower()) | |
| def load_thumbs_up_gallery(): | |
| print("Loading Thumbs Up gallery...") | |
| return sorted([os.path.join(thumbs_up_folder, f) for f in os.listdir(thumbs_up_folder) if os.path.splitext(f)[1].lower() in allowed_extensions], key=lambda x: os.path.basename(x).lower()) | |
| def load_thumbs_down_gallery(): | |
| print("Loading Thumbs Down gallery...") | |
| return sorted([os.path.join(thumbs_down_folder, f) for f in os.listdir(thumbs_down_folder) if os.path.splitext(f)[1].lower() in allowed_extensions], key=lambda x: os.path.basename(x).lower()) | |
| def load_all_folders_gallery(): | |
| print("Loading all folders gallery...") | |
| all_images = [] | |
| for folder in folders_cycle: | |
| all_images.extend(sorted([os.path.join(folder, f) for f in os.listdir(folder) if os.path.splitext(f)[1].lower() in allowed_extensions], key=lambda x: os.path.basename(x).lower())) | |
| return all_images | |
| def load_text_files(): | |
| print("Loading text files...") | |
| return sorted([[file, open(os.path.join(image_folder, file),'r').read()] for file in os.listdir(image_folder) if file.endswith('.txt')], key=lambda x: x[0].lower()) | |
| def update_text_content(df): | |
| global text_files_df | |
| print("Updating text content...") | |
| for row in df: | |
| if len(row) == 3: | |
| file_name, new_content, date_added = row | |
| file_path = os.path.join(image_folder, file_name) | |
| if os.path.exists(file_path): | |
| file_hash = hash_file(file_path) | |
| save_text(file_hash, new_content) | |
| else: | |
| print("Error: Unexpected number of values in row. Expected 3 values per row.") | |
| # Step 1: Define a function to update the dataframe | |
| def refresh_text_files(): | |
| print("Refreshing text files...") | |
| updated_text_files = sorted([[file, open(os.path.join(image_folder, file),'r').read()] for file in os.listdir(image_folder) if file.endswith('.txt')], key=lambda x: x[0].lower()) | |
| return updated_text_files | |
| def save_uploaded_image(image_file): | |
| """ | |
| Attempts to save the uploaded image to the designated image folder. Returns the path of the saved image or None if an error occurs. | |
| """ | |
| try: | |
| os.makedirs(image_folder, exist_ok=True) # Ensure the image folder exists | |
| image_path = os.path.join(image_folder, image_file.name) | |
| with open(image_path, "wb") as file: | |
| file.write(image_file.content) # Write the byte content of the file | |
| print(f"Image saved successfully: {image_path}") | |
| return image_path | |
| except Exception as e: | |
| print(f"Error saving image: {e}") | |
| return None | |
| def create_text_file_for_image(image_path): | |
| """ | |
| Generates an empty text file corresponding to an uploaded image, using the same base name. | |
| """ | |
| text_file_path = f"{os.path.splitext(image_path)[0]}.txt" | |
| open(text_file_path, 'w').close() # Efficiently create an empty text file | |
| return text_file_path | |
| def handle_image_upload(uploaded_files): | |
| status_messages = [] | |
| for uploaded_file in uploaded_files: | |
| file_extension = os.path.splitext(uploaded_file.name)[1].lower() | |
| if file_extension in allowed_extensions: | |
| # Ensure the image folder exists | |
| os.makedirs(image_folder, exist_ok=True) | |
| # Save the file with a unique name based on its hash | |
| unique_name = hashlib.sha256(uploaded_file.name.encode()).hexdigest() + file_extension | |
| save_path = os.path.join(image_folder, unique_name) | |
| with open(save_path, "wb") as file: | |
| file.write(uploaded_file.content) | |
| # Optionally, create a corresponding text file | |
| text_file_path = save_path + ".txt" | |
| with open(text_file_path, "w") as text_file: | |
| text_file.write(f"Image file: {unique_name} uploaded successfully.") | |
| status_messages.append(f"Uploaded and saved {uploaded_file.name} successfully.") | |
| else: | |
| status_messages.append(f"File {uploaded_file.name} has an unsupported extension ({file_extension}) and was not uploaded.") | |
| return "\n".join(status_messages) | |
| def refresh_image_description(current_text, prepend_text, append_text, pause_api_call, save_and_overwrite_changes, custom_prompt): | |
| global current_index, file_array | |
| if not pause_api_call: | |
| file_info = file_array[current_index] | |
| file_path = find_file_by_hash(file_info["hash"]) | |
| if file_path: | |
| with Image.open(file_path) as img: | |
| # Use the custom_prompt parameter when calling get_image_description | |
| text = get_image_description(img, custom_prompt) | |
| save_text(file_info["hash"], text, prepend_text, append_text, save_and_overwrite_changes) | |
| return np.array(img), file_info["hash"], file_info["status"], text, os.path.basename(file_path), file_path, os.path.relpath(file_path, start=image_folder), os.path.basename(file_path).replace(os.path.splitext(os.path.basename(file_path))[1], '.txt'), find_file_by_hash(file_info["hash"]).replace(os.path.splitext(find_file_by_hash(file_info["hash"]))[1], '.txt'), os.path.relpath(find_file_by_hash(file_info["hash"]).replace(os.path.splitext(find_file_by_hash(file_info["hash"]))[1], '.txt'), start=image_folder) | |
| # If API call is paused or file not found, return current state without changes | |
| return None, file_info["hash"], file_info["status"], current_text, os.path.basename(file_path), file_path, os.path.relpath(file_path, start=image_folder), os.path.basename(file_path).replace(os.path.splitext(os.path.basename(file_path))[1], '.txt'), find_file_by_hash(file_info["hash"]).replace(os.path.splitext(find_file_by_hash(file_info["hash"]))[1], '.txt'), os.path.relpath(find_file_by_hash(file_info["hash"]).replace(os.path.splitext(find_file_by_hash(file_info["hash"]))[1], '.txt'), start=image_folder) | |
| # Backup functionality for successful app launches | |
| backup_folder = "Backup_Scripts" | |
| os.makedirs(backup_folder, exist_ok=True) | |
| def backup_script(): | |
| """ | |
| This function creates a backup of the current script in the designated backup folder. | |
| """ | |
| current_script_path = os.path.realpath(__file__) | |
| backup_script_path = os.path.join(backup_folder, f"backup_{os.path.basename(current_script_path)}_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.py") | |
| shutil.copy2(current_script_path, backup_script_path) | |
| print(f"Backup of the script created at {backup_script_path}") | |
| def app_launch_success(): | |
| """ | |
| This function is called when the app successfully launches. | |
| It triggers the backup of the current script. | |
| """ | |
| print("App successfully launched. Creating a backup of the script...") | |
| backup_script() | |
| def refresh_all(): | |
| combined_details = load_combined_details() | |
| image_gallery_content = load_gallery() | |
| thumbs_up_gallery_content = load_thumbs_up_gallery() | |
| # Reinitialize the hash array and the index for new images | |
| update_hash_to_path_cache() | |
| refresh_file_array_and_hashes() | |
| current_index = 0 # Reset the index to start from the first image | |
| return combined_details, image_gallery_content, thumbs_up_gallery_content | |
| def combined_action(prompt, steps, model, styles, custom_prompt): | |
| # Trigger the OpenAI API for description and wait for the response | |
| description, status = trigger_openai_api(custom_prompt) | |
| if status == "success": | |
| # Once the response is received, generate the image based on the prompt and other parameters | |
| generated_image, log = generate_image(prompt, steps, model, styles) | |
| else: | |
| generated_image, log = None, "Failed to generate image due to API call failure." | |
| return generated_image, log, description | |
| # π Setup Environment π | |
| import gradio as gr | |
| import os | |
| from pathlib import Path | |
| import subprocess | |
| # Define the directory to store uploaded images | |
| image_folder = "./Images" | |
| os.makedirs(image_folder, exist_ok=True) # Create the directory if it doesn't exist | |
| # π Function Definitions π | |
| def save_and_show_images(uploaded_files): | |
| images_to_display = [] | |
| # Process each uploaded file | |
| for index, uploaded_file in enumerate(uploaded_files): | |
| # Define a base file name with index | |
| base_file_name = f"Img-{index}" | |
| # Check for existing files and increment index to avoid overwriting | |
| existing_files = [f for f in Path(image_folder).rglob('*') if f.is_file() and f.suffix in ['.jpg', '.png', '.txt']] | |
| while any(base_file_name in file.name for file in existing_files): | |
| index += 1 | |
| base_file_name = f"Img-{index}" | |
| # Determine the file extension (jpg or png) | |
| file_extension = "jpg" if uploaded_file[:3] == b'\xff\xd8\xff' else "png" | |
| final_file_name = f"{base_file_name}.{file_extension}" | |
| image_path = os.path.join(image_folder, final_file_name) | |
| # Write the bytes to a new file in the specified directory | |
| with open(image_path, "wb") as file: | |
| file.write(uploaded_file) | |
| # Create a corresponding text file for the image | |
| text_file_path = os.path.join(image_folder, f"{base_file_name}.txt") | |
| with open(text_file_path, "w") as text_file: | |
| text_file.write(f"Image file: {final_file_name}") | |
| # Add the path (or ideally, a URL to the file) to the list to display | |
| images_to_display.append(image_path) | |
| # Ensure all images have a corresponding text file | |
| for image_file_path in Path(image_folder).rglob('*'): | |
| if image_file_path.suffix in ['.jpg', '.png']: | |
| base_name = os.path.splitext(image_file_path.name)[0] | |
| text_file_path = os.path.join(image_folder, f"{base_name}.txt") | |
| if not os.path.exists(text_file_path): | |
| with open(text_file_path, "w") as text_file: | |
| text_file.write(f"Image file: {image_file_path.name}") | |
| return images_to_display | |
| with gr.Blocks() as app: | |
| with gr.Row(): | |
| upload_btn = gr.File(label="Upload Images", type="binary", file_count='multiple') | |
| gallery = gr.Gallery(label="Uploaded Images Gallery") | |
| upload_btn.change(fn=save_and_show_images, inputs=upload_btn, outputs=gallery) | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("## Image Viewer") | |
| with gr.Column(): | |
| # Add an upload button for uploading files to the images folder | |
| image = gr.Image(height=768, label="Current Image", image_mode="RGBA", width="max", elem_id="current_image", type="numpy") | |
| with gr.Row(): | |
| thumbs_down_btn = gr.Button("π Rate Down") | |
| thumbs_up_btn = gr.Button("π Rate Up") | |
| with gr.Row(): | |
| prev_btn = gr.Button("β¬ Previous") | |
| next_btn = gr.Button("Next β‘") | |
| with gr.Row(): | |
| status_display = gr.Textbox(label="Rating Status", interactive=False) | |
| image_path_display = gr.Textbox(label="Image Path", interactive=False) | |
| with gr.Column(): | |
| # Existing setup for checkboxes | |
| with gr.Row(): | |
| pause_api_call_checkbox = gr.Checkbox(label="Pause OpenAI API Calls", value=True) | |
| save_and_overwrite_changes_checkbox = gr.Checkbox(label="Save and Overwrite Changes", value=True) | |
| # New textbox for custom OpenAI API instructions | |
| custom_instructions = gr.Textbox(label="Custom Instructions", value="Use the art of deduction and creativity to generate a persona profile and 3 inspiration words to describe the image without describing the subject. Wrap it up with a concise 1 sentence caption of the image with the subject in high detail. JSON Format needed: Futuristic Persona Profile: The subject exudes a sense of readiness and authority, dressed in attire that hints at a future dominated by advanced technology and interstellar travel. This character could be envisioned as a commander or a pioneer in a futuristic saga, marked by their composed nature and the streamlined design of their gear. Futuristic Caption: A resolute figure of tomorrow, standing firm with a serene resolve, garbed in a sleek, technologically superior suit that narrates tales of distant realms and adventures. Poised Persona Profile: The individual appears as a beacon of confidence and tactical acumen. Their outfit suggests a world of progressive technology and potential space conquests. This persona might be a strategist or a guardian in a speculative fiction setting, distinguished by their steady presence and the modernistic cut of their uniform. Poised Caption: A visionary sentinel of the cosmos, poised with a calm yet assertive demeanor, dressed in an advanced, form-enhancing suit that hints at the mysteries of the universe yet to unfold. Advanced Persona Profile: The figure stands as a symbol of assurance and innovation, clad in a costume that forecasts an era of sophisticated technology and cosmic exploration. This character could represent an expert or a defender in a futuristic tale, highlighted by their tranquil posture and the elegant configuration of their attire. Advanced Caption: An unwavering pioneer of the future, the individual poses with a composed assurance, enveloped in a cutting-edge suit that speaks of advanced civilizations and uncharted frontiers.", placeholder="Enter custom instructions...", lines=2, max_lines=5) | |
| with gr.Accordion("π Thumbs Up Gallery", open=False): | |
| thumbs_up_gallery = gr.Gallery(label="Thumbs Up Image Gallery", value=load_thumbs_up_gallery(), every=5, columns=3, rows=1, object_fit="contain", height="auto") | |
| with gr.Row(): | |
| text_display = prompt = gr.Textbox(label="Image Analysis", lines=3, max_lines=10, interactive=True, placeholder="API call paused. Manually enter description.") | |
| output_image = gr.Image(type="pil", label="Visual Analysis Analog") | |
| with gr.Row(): | |
| trigger_api_btn = gr.Button("Analyze Aesthetic!") | |
| combined_btn = gr.Button("Analyze & Generate") # New combined action button | |
| generate_btn = gr.Button("Generate") | |
| analyze_all_btn = gr.Button("Analyze All Thumbs Up") | |
| descriptions_display = gr.Dataframe() | |
| status_display = gr.Textbox(label="Status", lines=10, interactive=False) | |
| analyze_all_btn.click(analyze_thumbs_up_images, inputs=[custom_instructions], outputs=[descriptions_display, status_display]) | |
| with gr.Row(): | |
| analyze_everything_btn = gr.Button("Analyze All Images") | |
| analyze_all_btn.click(analyze_all_images, inputs=[custom_instructions], outputs=[descriptions_display, status_display]) | |
| analyze_everything_btn.click(analyze_all_images, inputs=[custom_instructions], outputs=[descriptions_display, status_display]) | |
| with gr.Row(): | |
| gr.Image(type="pil") | |
| gr.Image(type="pil") | |
| gr.Image(type="pil") | |
| with gr.Accordion("Append / Prepend", open=False): | |
| with gr.Row(): | |
| prepend_text = gr.Textbox(label="Prepend", lines=2, max_lines=5, placeholder="Enter text to prepend...") | |
| append_text = gr.Textbox(label="Append", lines=2, max_lines=5, placeholder="Enter text to append...") | |
| def trigger_openai_api(custom_prompt): | |
| global current_index, file_array | |
| if current_index < len(file_array): | |
| file_info = file_array[current_index] | |
| file_path = find_file_by_hash(file_info["hash"]) | |
| if file_path: | |
| with Image.open(file_path) as img: | |
| # Use the custom_prompt parameter when calling get_image_description | |
| text = get_image_description(img, custom_prompt) | |
| save_text(file_info["hash"], text, "", "", True) # Assuming you want to save and overwrite changes by default | |
| return text, "Triggered OpenAI API successfully." | |
| return "", "Failed to trigger OpenAI API." | |
| trigger_api_btn.click(trigger_openai_api, inputs=[custom_instructions], outputs=[text_display, status_display]) | |
| with gr.Row(): | |
| delete_btn = gr.Button("ποΈ Delete") | |
| reset_btn = gr.Button("π Reset All Ratings ππ") | |
| with gr.Accordion(label="Edit Project", open=False): | |
| with gr.Column(scale=0): | |
| file_input = gr.File(file_count="multiple", type="binary", label="Upload Images or Zip Files") | |
| with gr.Row(): | |
| previous_session_button = gr.Button("π Previous Project") | |
| next_session_button = gr.Button("Next Projectπ") | |
| with gr.Row(): | |
| next_img = gr.Button("πΌοΈ Show Next Image") | |
| prev_img = gr.Button("πΌοΈ Show Previous Image") | |
| with gr.Row(): | |
| with gr.Row(): | |
| clear_button = gr.Button("ποΈ Clear Uploads Folder") | |
| undo_button = gr.Button("β©οΈ Undo Last Deletion") | |
| process_zip_button = gr.Button("π Process Zip Files") | |
| with gr.Column(): | |
| randomize_checkbox = gr.Checkbox(label="π Randomize Every 2 Seconds", value=True) | |
| refresh_time_input = gr.Number(label="β±οΈ Refresh Time", value=2) | |
| index_display = gr.Textbox(value=str(project_index), label="π’ Session Index", every=5) | |
| file_input.change(process_files, inputs=[file_input], outputs=[]) | |
| clear_button.click(clear_uploads_folder, inputs=[], outputs=[]) | |
| undo_button.click(undo_last_deletion, inputs=[], outputs=[]) | |
| next_session_button.click(next_session, inputs=[], outputs=[index_display]) | |
| previous_session_button.click(previous_session, inputs=[], outputs=[index_display]) | |
| next_img.click(get_next_image, inputs=[], outputs=[gallery]) | |
| prev_img.click(get_previous_image, inputs=[], outputs=[gallery]) | |
| process_zip_button.click(process_all_zips, inputs=[], outputs=[]) | |
| with gr.Accordion("Advanced Generation Settings", open=False): | |
| with gr.Column(): | |
| steps = gr.Slider(minimum=1, maximum=32, value=16, label="Steps") | |
| model = gr.Dropdown(choices=['TurboAnime.saftensors', 'OtherModel'], value='TurboAnime.saftensors', label="Model") | |
| styles = gr.CheckboxGroup(choices=['RayORender', 'OtherStyle'], value=['RayORender'], label="Styles") | |
| output_log = gr.Textbox(label="Log") | |
| # Step 2: Add a button to trigger the update | |
| generate_btn.click(fn=generate_image, inputs=[prompt, steps, model, styles], outputs=[output_image, output_log]) | |
| # Set the click action for the combined button | |
| combined_btn.click(combined_action, inputs=[prompt, steps, model, styles, custom_instructions], outputs=[output_image, output_log, text_display]) | |
| with gr.Accordion("Training Data", open=True): | |
| refresh_btn = gr.Button("Refresh") | |
| with gr.Row(): | |
| combined_details_df = gr.Dataframe(label="Combined Text and Image Details", value=load_combined_details(), headers=["Date Modified", "Rating Status", "File Name", "Text Content", "File Hash", "Text Path", "Image Path"], datatype=["str", "str", "str", "str", "str", "str", "str"], every=5) | |
| gallery = gr.Gallery(label="Image Gallery", value=load_gallery(), every=5) | |
| with gr.Accordion("File Info", open=False): | |
| file_hash_display = gr.Textbox(label="File Hash", interactive=False) | |
| text_path_display = gr.Textbox(label="Text Path", interactive=False) | |
| def delete_all_text_files_content(): | |
| text_files = [os.path.join(image_folder, f) for f in os.listdir(image_folder) if f.endswith('.txt')] | |
| for file_path in text_files: | |
| with open(file_path, 'w') as f: | |
| f.write('') # Clear the content of the text file | |
| return "All text files' content deleted." | |
| with gr.Row(): | |
| # Add a button to manually trigger a backup of the script | |
| backup_btn = gr.Button("Backup Script") | |
| backup_btn.click(backup_script, inputs=[], outputs=[]) | |
| delete_all_text_btn = gr.Button("Delete All Text Files Content") | |
| delete_all_text_btn.click(delete_all_text_files_content, inputs=[], outputs=status_display) | |
| # Modify the button click actions to include the pause_api_call_checkbox state and the save_and_overwrite_changes_checkbox state as arguments | |
| reset_btn.click(lambda t, pt, at, p, s, ci: reset_files(t, pt, at, p, s, ci), inputs=[text_display, prepend_text, append_text, pause_api_call_checkbox, save_and_overwrite_changes_checkbox, custom_instructions], outputs=[image, file_hash_display, status_display, text_display, image_path_display, text_path_display]) | |
| prev_btn.click(lambda t, pt, at, p, s, ci: get_file(False, t, pt, at, p, s, ci), inputs=[text_display, prepend_text, append_text, pause_api_call_checkbox, save_and_overwrite_changes_checkbox, custom_instructions], outputs=[image, file_hash_display, status_display, text_display, image_path_display, text_path_display]) | |
| next_btn.click(lambda t, pt, at, p, s, ci: get_file(True, t, pt, at, p, s, ci), inputs=[text_display, prepend_text, append_text, pause_api_call_checkbox, save_and_overwrite_changes_checkbox, custom_instructions], outputs=[image, file_hash_display, status_display, text_display, image_path_display, text_path_display]) | |
| thumbs_up_btn.click(lambda t, pt, at, p, s, ci: move_file("up", t, pt, at, p, s, ci), inputs=[text_display, prepend_text, append_text, pause_api_call_checkbox, save_and_overwrite_changes_checkbox, custom_instructions], outputs=[image, file_hash_display, status_display, text_display, image_path_display, text_path_display]) | |
| thumbs_down_btn.click(lambda t, pt, at, p, s, ci: move_file("down", t, pt, at, p, s, ci), inputs=[text_display, prepend_text, append_text, pause_api_call_checkbox, save_and_overwrite_changes_checkbox, custom_instructions], outputs=[image, file_hash_display, status_display, text_display, image_path_display, text_path_display]) | |
| delete_btn.click(lambda t, pt, at, p, s, ci: delete_file(t, pt, at, p, s, ci), inputs=[text_display, prepend_text, append_text, pause_api_call_checkbox, save_and_overwrite_changes_checkbox, custom_instructions], outputs=[image, file_hash_display, status_display, text_display, image_path_display, text_path_display]) | |
| refresh_btn.click(refresh_all, inputs=[], outputs=[combined_details_df, gallery, thumbs_up_gallery]) | |
| # Check for successful app launch and backup the script | |
| try: | |
| app.launch(share=True, server_port=7866, server_name="0.0.0.0") | |
| app_launch_success() | |
| except Exception as e: | |
| print(f"Error launching the app: {e}") | |