Flux-Trainer / src /image_gallery.py
Daniel Jarvis
Application files V1
f4a907c
import zipfile
import tempfile
import shutil
from pathlib import Path
from PIL import Image
import uuid
import os
from src.helpers import check_webp_support
# Check WebP support
WEBP_SUPPORTED = check_webp_support()
class ImageGallery:
def __init__(self):
self.uploaded_images = {}
self.temp_dir = tempfile.mkdtemp()
def add_generated_images(self, image_paths: list, prefix: str = "Generated"):
"""Add generated images to the gallery"""
if not image_paths:
return [], [], ""
# Get current count for numbering
current_count = len(self.uploaded_images)
image_display_paths = []
choices = []
info_text = ""
for i, image_path in enumerate(image_paths, 1):
if not os.path.exists(image_path):
continue
# Generate unique ID for each image
image_id = str(uuid.uuid4())
file_extension = Path(image_path).suffix.lower()
# Copy to our temp directory
temp_path = os.path.join(self.temp_dir, f"{image_id}{file_extension}")
shutil.copy2(image_path, temp_path)
# Handle WebP conversion if needed
processed_path, processed_ext = self.convert_webp_if_needed(temp_path, image_id)
# Get image info
try:
with Image.open(processed_path) as img:
width, height = img.size
file_size = os.path.getsize(processed_path)
size_mb = file_size / (1024 * 1024)
img_format = img.format
except Exception as e:
print(f"Error reading image info: {e}")
width, height = "Unknown", "Unknown"
size_mb = os.path.getsize(processed_path) / (1024 * 1024) if os.path.exists(processed_path) else 0
img_format = "Unknown"
# Create display name
display_index = current_count + i
original_name = f"{prefix}_{i:02d}{file_extension}"
# Store image info
self.uploaded_images[image_id] = {
'path': processed_path,
'original_name': original_name,
'width': width,
'height': height,
'size_mb': size_mb,
'index': display_index,
'format': img_format
}
# Create display info
format_info = f" [{img_format}]" if img_format != "Unknown" else ""
display_name = f"#{display_index}: {original_name}{format_info}"
image_display_paths.append((processed_path, display_name))
choices.append((f"#{display_index}: {original_name} ({width}x{height}, {size_mb:.1f}MB){format_info}", image_id))
# Add to info text
info_text += f"#{display_index}: {original_name} - {img_format}\n"
return image_display_paths, choices, info_text
def get_current_gallery_state(self):
"""Get current gallery state for updating display"""
if not self.uploaded_images:
return [], [], ""
image_paths = []
choices = []
info_text = ""
# Sort by index
sorted_images = sorted(self.uploaded_images.items(), key=lambda x: x[1]['index'])
for image_id, image_info in sorted_images:
format_info = f" [{image_info['format']}]" if image_info['format'] != "Unknown" else ""
display_name = f"#{image_info['index']}: {image_info['original_name']}{format_info}"
image_paths.append((image_info['path'], display_name))
choices.append((f"#{image_info['index']}: {image_info['original_name']} ({image_info['width']}x{image_info['height']}, {image_info['size_mb']:.1f}MB){format_info}", image_id))
info_text += f"#{image_info['index']}: {image_info['original_name']} - {image_info['format']}\n"
return image_paths, choices, info_text
def convert_webp_if_needed(self, file_path, image_id):
"""Convert WebP to PNG if there are compatibility issues"""
try:
with Image.open(file_path) as img:
# If it's WebP and we want to ensure compatibility, convert to PNG
if img.format == 'WEBP' and not WEBP_SUPPORTED:
png_path = os.path.join(self.temp_dir, f"{image_id}_converted.png")
# Convert RGBA if has transparency, otherwise RGB
if img.mode in ('RGBA', 'LA'):
img = img.convert('RGBA')
else:
img = img.convert('RGB')
img.save(png_path, 'PNG')
return png_path, '.png'
else:
return file_path, Path(file_path).suffix
except Exception as e:
print(f"Error processing image: {e}")
return file_path, Path(file_path).suffix
def upload_images(self, files):
"""Handle multiple image uploads with WebP support"""
if not files:
return [], [], ""
self.uploaded_images.clear()
image_paths = []
choices = []
info_text = ""
for i, file in enumerate(files, 1):
if file is None:
continue
# Generate unique ID for each image
image_id = str(uuid.uuid4())
original_name = Path(file.name).name
file_extension = Path(file.name).suffix.lower()
try:
# Copy file to temp directory with unique name
temp_path = os.path.join(self.temp_dir, f"{image_id}{file_extension}")
shutil.copy2(file.name, temp_path)
# Handle WebP conversion if needed
processed_path, processed_ext = self.convert_webp_if_needed(temp_path, image_id)
# Get image info
try:
with Image.open(processed_path) as img:
width, height = img.size
file_size = os.path.getsize(processed_path)
size_mb = file_size / (1024 * 1024)
img_format = img.format
except Exception as e:
print(f"Error reading image info: {e}")
width, height = "Unknown", "Unknown"
size_mb = os.path.getsize(processed_path) / (1024 * 1024) if os.path.exists(processed_path) else 0
img_format = "Unknown"
# Store image info
self.uploaded_images[image_id] = {
'path': processed_path,
'original_name': original_name,
'width': width,
'height': height,
'size_mb': size_mb,
'index': i,
'format': img_format
}
# Create display info
format_info = f" [{img_format}]" if img_format != "Unknown" else ""
display_name = f"#{i}: {original_name}{format_info}"
image_paths.append((processed_path, display_name))
choices.append((f"#{i}: {original_name} ({width}x{height}, {size_mb:.1f}MB){format_info}", image_id))
# Add to info text
info_text += f"#{i}: {original_name} - {img_format}\n"
except Exception as e:
print(f"Error processing file {original_name}: {e}")
# Still try to add it with basic info
info_text += f"#{i}: {original_name} - ERROR: {str(e)}\n"
continue
return image_paths, choices, info_text
def create_zip(self, selected_images):
"""Create zip file with selected images"""
if not selected_images:
return None
# Create temporary zip file
zip_path = os.path.join(self.temp_dir, f"selected_images_{uuid.uuid4().hex[:8]}.zip")
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for image_id in selected_images:
if image_id in self.uploaded_images:
image_info = self.uploaded_images[image_id]
zipf.write(
image_info['path'],
image_info['original_name']
)
return zip_path if os.path.exists(zip_path) else None