Spaces:
Runtime error
Runtime error
| """ | |
| Feedback Viewer Module | |
| This module provides functions for viewing and displaying feedback entries | |
| from Hugging Face Datasets. | |
| """ | |
| import logging | |
| import math | |
| import os | |
| from typing import List, Optional | |
| from io import BytesIO | |
| from datetime import datetime | |
| import tempfile | |
| import uuid | |
| import base64 | |
| from datasets import Dataset | |
| import gradio as gr | |
| from PIL import Image | |
| from ipadapter_model import create_image_grid | |
| # Hugging Face Datasets imports | |
| try: | |
| from datasets import load_dataset # type: ignore | |
| from huggingface_hub import login # type: ignore | |
| HF_DATASETS_AVAILABLE = True | |
| except ImportError: | |
| load_dataset = None # type: ignore | |
| login = None # type: ignore | |
| HF_DATASETS_AVAILABLE = False | |
| logging.warning("Hugging Face datasets not available. Feedback viewer will not work.") | |
| # Configuration - can be overridden | |
| HF_FEEDBACK_DATASET_REPO = os.getenv("HF_FEEDBACK_DATASET_REPO", None) | |
| HF_TOKEN = os.getenv("HF_TOKEN", None) | |
| def store_feedback_to_hf_dataset( | |
| rating: str, | |
| feedback_text: str, | |
| alpha_start: float, | |
| alpha_end: float, | |
| n_steps: int, | |
| input1_image: Optional[Image.Image], | |
| input2_image: Optional[Image.Image], | |
| extra_images: Optional[List[Image.Image]], | |
| negative_images: Optional[List[Image.Image]], | |
| blending_result_images: Optional[List[Image.Image]], | |
| is_public: bool = True, | |
| dataset_repo: Optional[str] = None, | |
| token: Optional[str] = None | |
| ) -> bool: | |
| """ | |
| Store feedback and images to a Hugging Face Dataset. | |
| Uses concatenate_datasets to append new entries without re-processing existing ones. | |
| This preserves existing images correctly. | |
| Args: | |
| rating: User rating (1-5) | |
| feedback_text: User feedback text | |
| alpha_start: Start alpha value used | |
| alpha_end: End alpha value used | |
| n_steps: Number of output images | |
| input1_image: First input image (PIL Image) | |
| input2_image: Second input image (PIL Image) | |
| extra_images: List of extra images (PIL Images) | |
| negative_images: List of negative images (PIL Images) | |
| blending_result_images: List of blending result images (PIL Images) | |
| is_public: Whether the feedback should be publicly visible (default True) | |
| dataset_repo: Hugging Face dataset repository (username/dataset-name) | |
| token: Hugging Face token (if None, will try to use HF_TOKEN env var) | |
| Returns: | |
| True if feedback was stored successfully, False otherwise | |
| """ | |
| if not HF_DATASETS_AVAILABLE: | |
| logging.warning("Hugging Face datasets library not available") | |
| return False | |
| if dataset_repo is None: | |
| dataset_repo = HF_FEEDBACK_DATASET_REPO | |
| if dataset_repo is None: | |
| logging.warning("HF_FEEDBACK_DATASET_REPO not set. Set it to your Hugging Face username/dataset-name") | |
| return False | |
| # Validate that input1 and input2 images are not empty | |
| if input1_image is None: | |
| error_msg = "Input 1 image cannot be empty. Please provide a valid image." | |
| logging.error(error_msg) | |
| raise ValueError(error_msg) | |
| if input2_image is None: | |
| error_msg = "Input 2 image cannot be empty. Please provide a valid image." | |
| logging.error(error_msg) | |
| raise ValueError(error_msg) | |
| # Check if image is actually empty (size 0x0) | |
| if hasattr(input1_image, 'size') and input1_image.size == (0, 0): | |
| error_msg = "Input 1 image is empty (0x0 size). Please provide a valid image." | |
| logging.error(error_msg) | |
| raise ValueError(error_msg) | |
| if hasattr(input2_image, 'size') and input2_image.size == (0, 0): | |
| error_msg = "Input 2 image is empty (0x0 size). Please provide a valid image." | |
| logging.error(error_msg) | |
| raise ValueError(error_msg) | |
| try: | |
| # Type guards - these should not be None if HF_DATASETS_AVAILABLE is True | |
| if Dataset is None or load_dataset is None or login is None: | |
| logging.error("Hugging Face datasets libraries not properly imported") | |
| return False | |
| from datasets import Features, Image as ImageFeature, Value, Sequence, concatenate_datasets | |
| # Use token from parameter, environment variable, or try to login | |
| if token is None: | |
| token = HF_TOKEN | |
| if token: | |
| login(token=token, add_to_git_credential=True) | |
| # Define features schema | |
| features = Features({ | |
| "uuid": Value("string"), | |
| "timestamp": Value("string"), | |
| "rating": Value("int64"), | |
| "feedback": Value("string"), | |
| "alpha_start": Value("float64"), | |
| "alpha_end": Value("float64"), | |
| "n_steps": Value("int64"), | |
| "is_public": Value("bool"), # Whether feedback is publicly visible | |
| "input1": ImageFeature(), | |
| "input2": ImageFeature(), | |
| "extra_images": Sequence(ImageFeature()), | |
| "negative_images": Sequence(ImageFeature()), | |
| "blending_results": Sequence(ImageFeature()), # List of blending result images | |
| }) | |
| # Ensure images are RGB PIL Images | |
| def prepare_image(img): | |
| if img is None: | |
| return None | |
| if isinstance(img, Image.Image): | |
| return img.convert("RGB") if img.mode != "RGB" else img | |
| return None | |
| # Generate a new UUID for this feedback entry | |
| new_uuid = str(uuid.uuid4()) | |
| logging.info(f"Generated UUID for new feedback entry: {new_uuid}") | |
| # Create feedback entry with PIL Images directly | |
| # ImageFeature handles PIL Images and uploads them properly | |
| feedback_entry = { | |
| "uuid": new_uuid, | |
| "timestamp": datetime.now().isoformat(), | |
| "rating": int(rating) if rating else 0, | |
| "feedback": feedback_text or "", | |
| "alpha_start": float(alpha_start), | |
| "alpha_end": float(alpha_end), | |
| "n_steps": int(n_steps), | |
| "is_public": bool(is_public), # Store public visibility flag | |
| "input1": prepare_image(input1_image), | |
| "input2": prepare_image(input2_image), | |
| "extra_images": [prepare_image(img) for img in (extra_images or []) if prepare_image(img) is not None], | |
| "negative_images": [prepare_image(img) for img in (negative_images or []) if prepare_image(img) is not None], | |
| "blending_results": [prepare_image(img) for img in (blending_result_images or []) if prepare_image(img) is not None], | |
| } | |
| # Create a new dataset with just the new entry | |
| new_entry_dataset = Dataset.from_list([feedback_entry], features=features) | |
| # Try to load existing dataset and concatenate | |
| try: | |
| existing_dataset = load_dataset(dataset_repo, split="train") | |
| logging.info(f"Loaded existing dataset with {len(existing_dataset)} entries") | |
| # Check if existing dataset has UUID field, if not add it | |
| if "uuid" not in existing_dataset.column_names: | |
| logging.info("Existing dataset missing UUID field, adding UUIDs to existing entries") | |
| def add_uuid(example): | |
| if "uuid" not in example or not example.get("uuid"): | |
| example["uuid"] = str(uuid.uuid4()) | |
| return example | |
| existing_dataset = existing_dataset.map(add_uuid) | |
| # Check if existing dataset has is_public field, if not add it (default to True for old entries) | |
| if "is_public" not in existing_dataset.column_names: | |
| logging.info("Existing dataset missing is_public field, adding is_public=True to existing entries") | |
| def add_is_public(example): | |
| if "is_public" not in example: | |
| example["is_public"] = True | |
| return example | |
| existing_dataset = existing_dataset.map(add_is_public) | |
| # Ensure the existing dataset has the UUID and is_public fields in its schema | |
| # Add fields to schema if missing, then cast to match new schema | |
| try: | |
| existing_features = existing_dataset.features | |
| from datasets import Features as DatasetFeatures | |
| updated_features_dict = dict(existing_features) | |
| schema_updated = False | |
| if "uuid" not in existing_features: | |
| # Create new features dict with UUID added | |
| updated_features_dict["uuid"] = Value("string") | |
| schema_updated = True | |
| if "is_public" not in existing_features: | |
| # Add is_public field to schema | |
| updated_features_dict["is_public"] = Value("bool") | |
| schema_updated = True | |
| if schema_updated: | |
| updated_features = DatasetFeatures(updated_features_dict) | |
| existing_dataset = existing_dataset.cast(updated_features) | |
| # Cast to match new schema (this ensures all fields match) | |
| existing_dataset = existing_dataset.cast(features) | |
| except Exception as cast_error: | |
| logging.warning(f"Could not cast existing dataset schema: {cast_error}. Attempting to proceed anyway.") | |
| # Try to add UUID field manually if cast failed | |
| if "uuid" not in existing_dataset.column_names: | |
| def ensure_uuid(example): | |
| if "uuid" not in example or not example.get("uuid"): | |
| example["uuid"] = str(uuid.uuid4()) | |
| return example | |
| existing_dataset = existing_dataset.map(ensure_uuid) | |
| # Concatenate: existing entries stay untouched, new entry is appended | |
| combined_dataset = concatenate_datasets([existing_dataset, new_entry_dataset]) | |
| logging.info(f"Combined dataset has {len(combined_dataset)} entries") | |
| except Exception as e: | |
| logging.info(f"Dataset not found or empty, creating new one: {e}") | |
| combined_dataset = new_entry_dataset | |
| # Verify UUID is present in all entries before pushing | |
| if "uuid" not in combined_dataset.column_names: | |
| logging.warning("UUID column missing from combined dataset, adding UUIDs to all entries") | |
| def ensure_all_have_uuid(example): | |
| if "uuid" not in example or not example.get("uuid"): | |
| example["uuid"] = str(uuid.uuid4()) | |
| return example | |
| combined_dataset = combined_dataset.map(ensure_all_have_uuid) | |
| # Ensure schema includes UUID | |
| try: | |
| combined_dataset = combined_dataset.cast(features) | |
| except Exception as cast_err: | |
| logging.warning(f"Could not cast combined dataset to features schema: {cast_err}") | |
| # Verify the new entry has UUID | |
| if len(combined_dataset) > 0: | |
| last_entry = combined_dataset[-1] | |
| if last_entry.get("uuid") == new_uuid: | |
| logging.info(f"Verified UUID {new_uuid} is present in the new entry") | |
| else: | |
| logging.warning(f"UUID mismatch! Expected {new_uuid}, got {last_entry.get('uuid')}") | |
| # Push to hub | |
| combined_dataset.push_to_hub( | |
| dataset_repo, | |
| private=True, | |
| token=token | |
| ) | |
| logging.info(f"Feedback stored successfully to {dataset_repo} with UUID: {new_uuid}") | |
| return True | |
| except Exception as e: | |
| logging.error(f"Error storing feedback to Hugging Face Dataset: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return False | |
| def convert_dataset_image_to_pil(image_data): | |
| """ | |
| Convert image data from Hugging Face Dataset format to PIL Image. | |
| Dataset images can be: | |
| - PIL Image (already correct) | |
| - datasets.Image object (from Hugging Face datasets library) | |
| - dict with 'path' key (file path) | |
| - dict with 'bytes' key (image bytes) | |
| - dict with 'image' key (nested PIL Image) | |
| - str (file path, filename, or hub reference) | |
| - None | |
| """ | |
| if image_data is None or image_data == "": | |
| return None | |
| # Already a PIL Image | |
| if isinstance(image_data, Image.Image): | |
| return image_data.convert("RGB") if image_data.mode != "RGB" else image_data | |
| # Handle datasets.Image type (from Hugging Face datasets library) | |
| # This is the primary format when images are loaded from hub with ImageFeature | |
| try: | |
| from datasets import Image as DatasetImage | |
| # Check if it's a DatasetImage (using isinstance or checking class name) | |
| is_dataset_image = isinstance(image_data, DatasetImage) if DatasetImage else False | |
| if not is_dataset_image and hasattr(image_data, '__class__'): | |
| class_name = str(type(image_data)) | |
| is_dataset_image = 'Image' in class_name and 'datasets' in class_name | |
| if is_dataset_image: | |
| # Convert datasets.Image to PIL Image | |
| # DatasetImage objects decode lazily when accessed | |
| try: | |
| # Method 1: Try accessing .image attribute (most common) | |
| if hasattr(image_data, 'image'): | |
| pil_img = image_data.image | |
| if isinstance(pil_img, Image.Image): | |
| return pil_img.convert("RGB") if pil_img.mode != "RGB" else pil_img | |
| # Method 2: Try calling as function (some versions) | |
| if callable(image_data): | |
| try: | |
| pil_img = image_data() | |
| if isinstance(pil_img, Image.Image): | |
| return pil_img.convert("RGB") if pil_img.mode != "RGB" else pil_img | |
| except Exception: | |
| pass | |
| # Method 3: Try convert method | |
| if hasattr(image_data, 'convert'): | |
| try: | |
| pil_img = image_data.convert("RGB") | |
| if isinstance(pil_img, Image.Image): | |
| return pil_img | |
| except Exception: | |
| pass | |
| # Method 4: Try direct access (some versions store PIL Image directly) | |
| if isinstance(image_data, Image.Image): | |
| return image_data.convert("RGB") if image_data.mode != "RGB" else image_data | |
| # Method 5: Try to get bytes and decode | |
| if hasattr(image_data, 'bytes'): | |
| try: | |
| bytes_data = image_data.bytes | |
| if bytes_data: | |
| return Image.open(BytesIO(bytes_data)).convert("RGB") | |
| except Exception: | |
| pass | |
| # Method 6: Try path attribute and load from hub cache | |
| if hasattr(image_data, 'path'): | |
| try: | |
| path = image_data.path | |
| if path: | |
| # Path might be relative (hub reference) or absolute (local cache) | |
| if os.path.exists(path): | |
| return Image.open(path).convert("RGB") | |
| # If path doesn't exist, it might be in HF cache | |
| # Try to find it using huggingface_hub | |
| try: | |
| from huggingface_hub import hf_hub_download | |
| # We'd need the repo name, but for now try the path | |
| # The dataset library should handle this automatically | |
| pass | |
| except: | |
| pass | |
| except Exception: | |
| pass | |
| # Method 7: Try accessing via __getitem__ or direct attribute access | |
| try: | |
| if hasattr(image_data, '__getitem__'): | |
| pil_img = image_data[0] if len(image_data) > 0 else None | |
| if isinstance(pil_img, Image.Image): | |
| return pil_img.convert("RGB") if pil_img.mode != "RGB" else pil_img | |
| except Exception: | |
| pass | |
| logging.debug(f"Could not convert DatasetImage object (type: {type(image_data)}, methods tried)") | |
| return None | |
| except Exception as e: | |
| logging.debug(f"Error converting DatasetImage: {e}") | |
| return None | |
| except (ImportError, AttributeError) as e: | |
| pass | |
| # Handle dictionary formats | |
| if isinstance(image_data, dict): | |
| # Check for nested image | |
| if 'image' in image_data: | |
| return convert_dataset_image_to_pil(image_data['image']) | |
| # Check for path | |
| if 'path' in image_data: | |
| try: | |
| path = image_data['path'] | |
| # If it's a relative path (just filename), try to find it in dataset cache | |
| if not os.path.isabs(path) and not os.path.exists(path): | |
| # Try to load from huggingface cache | |
| try: | |
| from huggingface_hub import hf_hub_download | |
| # This is a fallback - we'd need the dataset repo to download | |
| # For now, just try the path as-is | |
| pass | |
| except: | |
| pass | |
| return Image.open(path).convert("RGB") | |
| except Exception as e: | |
| logging.warning(f"Could not load image from path {image_data.get('path')}: {e}") | |
| return None | |
| # Check for bytes | |
| if 'bytes' in image_data: | |
| try: | |
| return Image.open(BytesIO(image_data['bytes'])).convert("RGB") | |
| except Exception as e: | |
| logging.warning(f"Could not load image from bytes: {e}") | |
| return None | |
| # If dict has no recognized keys, try to extract image directly | |
| # Some datasets store PIL Images wrapped in dicts | |
| for key in ['pil', 'PIL', 'img', 'image_data']: | |
| if key in image_data: | |
| return convert_dataset_image_to_pil(image_data[key]) | |
| # Last resort: if dict has a single value that might be an image path | |
| if len(image_data) == 1: | |
| value = list(image_data.values())[0] | |
| if isinstance(value, str): | |
| return convert_dataset_image_to_pil(value) | |
| # Try to open if it's a string path | |
| if isinstance(image_data, str): | |
| # First try as direct file path | |
| try: | |
| if os.path.exists(image_data): | |
| return Image.open(image_data).convert("RGB") | |
| except: | |
| pass | |
| # If it's just a filename (not a full path), it might be in the dataset cache | |
| # When images are loaded from hub, they're cached locally | |
| # Try to find it in the Hugging Face cache | |
| if not os.path.isabs(image_data) and not os.path.exists(image_data): | |
| try: | |
| # Try to get the image from the currently loaded dataset | |
| # This is a workaround - ideally the dataset should return Image objects | |
| # For now, log a warning | |
| logging.debug(f"Image filename '{image_data}' not found locally. It should be loaded as Image object from dataset.") | |
| return None | |
| except: | |
| pass | |
| # Last attempt: try opening it anyway | |
| try: | |
| return Image.open(image_data).convert("RGB") | |
| except Exception as e: | |
| logging.warning(f"Could not load image from string '{image_data}': {e}") | |
| return None | |
| # If we get here, we don't know how to convert it | |
| logging.warning(f"Unknown image format: {type(image_data)}") | |
| return None | |
| def create_image_grid_for_entry(input1, input2, result): | |
| """Create a grid showing input1, input2, and result side by side.""" | |
| if input1 is None and input2 is None and result is None: | |
| return None | |
| # Create a horizontal grid: input1 | input2 | result | |
| images_to_combine = [] | |
| if input1: | |
| images_to_combine.append(input1) | |
| if input2: | |
| images_to_combine.append(input2) | |
| if result: | |
| images_to_combine.append(result) | |
| if not images_to_combine: | |
| return None | |
| # Calculate grid dimensions | |
| max_width = max(img.width for img in images_to_combine if img) | |
| max_height = max(img.height for img in images_to_combine if img) | |
| # Resize all images to same height, maintain aspect ratio | |
| resized_images = [] | |
| for img in images_to_combine: | |
| if img: | |
| aspect_ratio = img.width / img.height | |
| new_height = max_height | |
| new_width = int(new_height * aspect_ratio) | |
| resized_images.append(img.resize((new_width, new_height), Image.Resampling.LANCZOS)) | |
| else: | |
| # Create placeholder | |
| placeholder = Image.new("RGB", (max_width, max_height), color="gray") | |
| resized_images.append(placeholder) | |
| # Combine images horizontally | |
| total_width = sum(img.width for img in resized_images) | |
| combined = Image.new("RGB", (total_width, max_height)) | |
| x_offset = 0 | |
| for img in resized_images: | |
| combined.paste(img, (x_offset, 0)) | |
| x_offset += img.width | |
| return combined | |
| def pil_image_to_base64(img: Optional[Image.Image], max_size: int = 400) -> str: | |
| """Convert PIL Image to base64 data URI for HTML embedding.""" | |
| if img is None: | |
| return "" | |
| # Resize if too large | |
| if max(img.width, img.height) > max_size: | |
| aspect_ratio = img.width / img.height | |
| if img.width > img.height: | |
| new_width = max_size | |
| new_height = int(max_size / aspect_ratio) | |
| else: | |
| new_height = max_size | |
| new_width = int(max_size * aspect_ratio) | |
| img = img.resize((new_width, new_height), Image.Resampling.LANCZOS) | |
| # Convert to RGB if needed | |
| if img.mode != "RGB": | |
| img = img.convert("RGB") | |
| # Convert to base64 | |
| buffered = BytesIO() | |
| img.save(buffered, format="PNG") | |
| img_str = base64.b64encode(buffered.getvalue()).decode() | |
| return f"data:image/png;base64,{img_str}" | |
| def load_feedback_from_hf_dataset( | |
| dataset_repo: Optional[str] = None, | |
| token: Optional[str] = None, | |
| limit: Optional[int] = None, | |
| reverse: bool = False, | |
| public_only: bool = True | |
| ) -> List[dict]: | |
| """ | |
| Load feedback entries from a Hugging Face Dataset. | |
| Args: | |
| dataset_repo: Hugging Face dataset repository (username/dataset-name) | |
| token: Hugging Face token (if None, will try to use HF_TOKEN env var) | |
| limit: Maximum number of entries to return (None for all) | |
| reverse: If True, reverse the order (newest first). Default False (oldest first). | |
| public_only: If True, only return public feedback entries. Default True. | |
| Old entries without is_public field are treated as public. | |
| Returns: | |
| List of feedback entries as dictionaries | |
| """ | |
| if not HF_DATASETS_AVAILABLE: | |
| logging.warning("Hugging Face datasets library not available") | |
| return [] | |
| if dataset_repo is None: | |
| dataset_repo = HF_FEEDBACK_DATASET_REPO | |
| if dataset_repo is None: | |
| logging.warning("HF_FEEDBACK_DATASET_REPO not set") | |
| return [] | |
| try: | |
| # Type guards | |
| if load_dataset is None or login is None: | |
| logging.error("Hugging Face datasets libraries not properly imported") | |
| return [] | |
| # Use token from parameter, environment variable, or try to login | |
| if token is None: | |
| token = HF_TOKEN | |
| if token: | |
| login(token=token, add_to_git_credential=True) | |
| # Load dataset and cast image columns to ImageFeature to ensure proper decoding | |
| try: | |
| from datasets import Features, Image as ImageFeature, Value, Sequence | |
| # Load dataset first | |
| dataset = load_dataset(dataset_repo, split="train") | |
| # Cast image columns to ImageFeature to ensure they're properly decoded from hub storage | |
| # This is crucial - without casting, images stored as file paths won't be accessible | |
| # When images are uploaded with ImageFeature, they're stored in hub storage | |
| # Casting ensures they're decoded as Image objects when loaded | |
| try: | |
| # Check current feature types | |
| current_features = dataset.features | |
| logging.debug(f"Dataset features before casting: {list(current_features.keys())}") | |
| # Cast each image column - this will decode file paths from hub storage | |
| if "input1" in dataset.column_names: | |
| dataset = dataset.cast_column("input1", ImageFeature()) | |
| if "input2" in dataset.column_names: | |
| dataset = dataset.cast_column("input2", ImageFeature()) | |
| if "blending_results" in dataset.column_names: | |
| dataset = dataset.cast_column("blending_results", Sequence(ImageFeature())) | |
| elif "blending_result" in dataset.column_names: | |
| # Backward compatibility for old single-image format | |
| dataset = dataset.cast_column("blending_result", ImageFeature()) | |
| if "extra_images" in dataset.column_names: | |
| dataset = dataset.cast_column("extra_images", Sequence(ImageFeature())) | |
| if "negative_images" in dataset.column_names: | |
| dataset = dataset.cast_column("negative_images", Sequence(ImageFeature())) | |
| logging.debug(f"Successfully cast image columns to ImageFeature") | |
| except Exception as e: | |
| logging.warning(f"Could not cast image columns (may already be ImageFeature or incompatible format): {e}") | |
| # Continue anyway - images might already be ImageFeature objects or need different handling | |
| # Convert dataset to list, ensuring images are properly loaded | |
| # When ImageFeature is used, images come as Image objects that need to be accessed | |
| # Iterate through dataset and explicitly access image fields to trigger decoding | |
| data = [] | |
| for idx, entry in enumerate(dataset): | |
| # Access image fields directly from entry to trigger lazy loading/decoding | |
| # This is critical for DatasetImage objects which decode on access | |
| try: | |
| # Force access to image fields - this triggers decoding from hub storage | |
| input1_raw = entry["input1"] if "input1" in entry else None | |
| input2_raw = entry["input2"] if "input2" in entry else None | |
| # Support both new format (blending_results - list) and old format (blending_result - single) | |
| blending_results_raw = entry.get("blending_results", []) | |
| blending_result_raw = entry.get("blending_result", None) # Backward compatibility | |
| extra_images_raw = entry.get("extra_images", []) | |
| negative_images_raw = entry.get("negative_images", []) | |
| except Exception as e: | |
| logging.debug(f"Error accessing image fields in entry {idx}: {e}") | |
| input1_raw = None | |
| input2_raw = None | |
| blending_results_raw = [] | |
| blending_result_raw = None | |
| extra_images_raw = [] | |
| negative_images_raw = [] | |
| # Create a copy of the entry with decoded images | |
| entry_dict = dict(entry) | |
| # Add UUID if missing (backward compatibility for old entries) | |
| if "uuid" not in entry_dict or not entry_dict.get("uuid"): | |
| entry_dict["uuid"] = str(uuid.uuid4()) | |
| # Add is_public if missing (backward compatibility - old entries are public by default) | |
| if "is_public" not in entry_dict: | |
| entry_dict["is_public"] = True | |
| # Filter by public_only if requested | |
| if public_only and not entry_dict.get("is_public", True): | |
| continue | |
| # Convert Image objects to PIL Images using the raw accessed values | |
| # ImageFeature objects need to be converted to PIL Images for display | |
| try: | |
| # Handle single image fields using the raw accessed values | |
| entry_dict["input1"] = convert_dataset_image_to_pil(input1_raw) | |
| entry_dict["input2"] = convert_dataset_image_to_pil(input2_raw) | |
| # Handle blending results - support both new list format and old single image format | |
| if blending_results_raw: | |
| converted_results = [] | |
| for img_item in blending_results_raw: | |
| converted = convert_dataset_image_to_pil(img_item) | |
| if converted is not None: | |
| converted_results.append(converted) | |
| entry_dict["blending_results"] = converted_results | |
| elif blending_result_raw: | |
| # Backward compatibility: convert single image to list | |
| single_img = convert_dataset_image_to_pil(blending_result_raw) | |
| entry_dict["blending_results"] = [single_img] if single_img else [] | |
| else: | |
| entry_dict["blending_results"] = [] | |
| # Handle list image fields | |
| if extra_images_raw: | |
| converted_extra = [] | |
| for img_item in extra_images_raw: | |
| converted = convert_dataset_image_to_pil(img_item) | |
| if converted is not None: | |
| converted_extra.append(converted) | |
| entry_dict["extra_images"] = converted_extra | |
| else: | |
| entry_dict["extra_images"] = [] | |
| if negative_images_raw: | |
| converted_neg = [] | |
| for img_item in negative_images_raw: | |
| converted = convert_dataset_image_to_pil(img_item) | |
| if converted is not None: | |
| converted_neg.append(converted) | |
| entry_dict["negative_images"] = converted_neg | |
| else: | |
| entry_dict["negative_images"] = [] | |
| except Exception as e: | |
| logging.warning(f"Error converting images in entry {idx}: {e}") | |
| # Set to None if conversion fails | |
| entry_dict["input1"] = None | |
| entry_dict["input2"] = None | |
| entry_dict["blending_results"] = [] | |
| entry_dict["extra_images"] = [] | |
| entry_dict["negative_images"] = [] | |
| data.append(entry_dict) | |
| # Sort by timestamp to ensure proper chronological order | |
| # Parse timestamp and sort (oldest first by default) | |
| def get_timestamp(entry): | |
| timestamp_str = entry.get("timestamp", "") | |
| if not timestamp_str: | |
| return 0 | |
| try: | |
| # Parse ISO format timestamp (e.g., "2024-01-01T12:00:00" or "2024-01-01T12:00:00.123456") | |
| # Handle Z suffix for UTC | |
| if timestamp_str.endswith("Z"): | |
| timestamp_str = timestamp_str[:-1] + "+00:00" | |
| # Try parsing with timezone info first | |
| if "+" in timestamp_str or timestamp_str.count("-") > 2: | |
| # Has timezone info | |
| dt = datetime.fromisoformat(timestamp_str) | |
| else: | |
| # No timezone, assume naive datetime | |
| dt = datetime.fromisoformat(timestamp_str) | |
| return dt.timestamp() | |
| except (ValueError, AttributeError) as e: | |
| # If timestamp parsing fails, log and use 0 (will appear first in old-to-new) | |
| logging.debug(f"Could not parse timestamp '{timestamp_str}': {e}") | |
| return 0 | |
| # Sort by timestamp (oldest first) | |
| data.sort(key=get_timestamp) | |
| # Reverse if requested (newest first) | |
| if reverse: | |
| data.reverse() | |
| # Apply limit if specified | |
| if limit is not None: | |
| data = data[:limit] | |
| return data | |
| except Exception as e: | |
| logging.warning(f"Could not load dataset: {e}") | |
| return [] | |
| except Exception as e: | |
| logging.error(f"Error loading feedback from Hugging Face Dataset: {e}") | |
| return [] | |
| def create_feedback_viewer_tab(): | |
| """Create the feedback viewer tab interface.""" | |
| with gr.Tab("Feedback Viewer"): | |
| gr.Markdown(""" | |
| ## Feedback Viewer | |
| View submitted feedback and images from users. | |
| """) | |
| # Top controls group | |
| with gr.Group(): | |
| refresh_button = gr.Button("🔄 Refresh Feedback", variant="primary") | |
| # Main feedback display | |
| feedback_html = gr.HTML( | |
| label="Feedback Entries", | |
| value="<p>Click 'Refresh Feedback' to load entries.</p>" | |
| ) | |
| # Pagination controls group | |
| with gr.Group(): | |
| gr.Markdown("### Pagination") | |
| with gr.Row(): | |
| items_per_page_slider = gr.Slider( | |
| minimum=1, | |
| maximum=50, | |
| step=1, | |
| value=10, | |
| label="Items per page", | |
| scale=2 | |
| ) | |
| with gr.Column(scale=3): | |
| with gr.Row(): | |
| prev_page_button = gr.Button("◀ Previous", variant="secondary", scale=1) | |
| page_number = gr.Number( | |
| value=1, | |
| minimum=1, | |
| step=1, | |
| label="Page", | |
| precision=0, | |
| scale=2 | |
| ) | |
| next_page_button = gr.Button("Next ▶", variant="secondary", scale=1) | |
| total_pages_display = gr.Markdown("**Total Pages:** -") | |
| # Search controls group | |
| with gr.Group(): | |
| gr.Markdown("### Search & Filter") | |
| with gr.Row(): | |
| uuid_search_input = gr.Textbox( | |
| label="Search by UUID", | |
| placeholder="Enter UUID to search (leave empty to show all)", | |
| value="", | |
| scale=3 | |
| ) | |
| search_button = gr.Button("🔍 Search", variant="secondary", scale=1) | |
| with gr.Row(): | |
| timestamp_start_input = gr.Textbox( | |
| label="Start Timestamp", | |
| placeholder="YYYY-MM-DD or YYYY-MM-DDTHH:MM:SS", | |
| value="", | |
| scale=1 | |
| ) | |
| timestamp_end_input = gr.Textbox( | |
| label="End Timestamp", | |
| placeholder="YYYY-MM-DD or YYYY-MM-DDTHH:MM:SS", | |
| value="", | |
| scale=1 | |
| ) | |
| rating_filter = gr.Dropdown( | |
| choices=["All", "1", "2", "3", "4", "5"], | |
| value="All", | |
| label="Rating Filter", | |
| scale=1 | |
| ) | |
| with gr.Row(): | |
| filter_extra_images = gr.Checkbox( | |
| label="Only entries with extra images", | |
| value=False | |
| ) | |
| filter_negative_images = gr.Checkbox( | |
| label="Only entries with negative images", | |
| value=False | |
| ) | |
| sort_order_radio = gr.Radio( | |
| choices=["Old to New", "New to Old"], | |
| value="Old to New", | |
| label="Sort Order", | |
| scale=2 | |
| ) | |
| selected_details = gr.JSON(label="Full Feedback Details", visible=False) | |
| # Admin section - hidden behind accordion | |
| with gr.Accordion("⋮", open=False): | |
| gr.Markdown("### Admin Options") | |
| with gr.Row(): | |
| admin_password_input = gr.Textbox( | |
| label="Admin Password", | |
| placeholder="Enter admin password", | |
| type="password", | |
| value="", | |
| scale=2 | |
| ) | |
| include_private_checkbox = gr.Checkbox( | |
| label="Include private feedbacks", | |
| value=False, | |
| interactive=False, | |
| scale=1 | |
| ) | |
| verify_password_button = gr.Button("🔓 Verify", variant="secondary", scale=1) | |
| admin_status = gr.Markdown("") | |
| gr.Markdown("### Delete Entry") | |
| with gr.Row(): | |
| delete_uuid_input = gr.Textbox( | |
| label="UUID to Delete", | |
| placeholder="Enter UUID or prefix (e.g. e7132a33)", | |
| value="", | |
| scale=3 | |
| ) | |
| delete_button = gr.Button("🗑️ Delete", variant="stop", scale=1) | |
| delete_status = gr.Markdown("") | |
| def verify_admin_password(password: str, current_include_private: bool): | |
| """Verify admin password and enable/disable private feedback checkbox.""" | |
| if password == "admin": | |
| return ( | |
| gr.update(value=True, interactive=True), # Enable and check the checkbox | |
| "✅ Admin access granted. You can now view private feedbacks." | |
| ) | |
| else: | |
| return ( | |
| gr.update(value=False, interactive=False), # Disable and uncheck the checkbox | |
| "❌ Invalid password. Private feedbacks hidden." | |
| ) | |
| verify_password_button.click( | |
| verify_admin_password, | |
| inputs=[admin_password_input, include_private_checkbox], | |
| outputs=[include_private_checkbox, admin_status] | |
| ) | |
| def delete_entry_by_uuid(uuid_to_delete: str, admin_password: str): | |
| """Delete a feedback entry by UUID after password verification.""" | |
| # Check password | |
| if admin_password != "admin": | |
| return "❌ Invalid admin password. Enter password and click Verify first." | |
| if not uuid_to_delete or not uuid_to_delete.strip(): | |
| return "❌ Please enter a UUID to delete." | |
| uuid_to_delete = uuid_to_delete.strip() | |
| try: | |
| if not HF_DATASETS_AVAILABLE or load_dataset is None or login is None: | |
| return "❌ Hugging Face datasets library not available." | |
| if HF_TOKEN: | |
| login(token=HF_TOKEN, add_to_git_credential=True) | |
| if not HF_FEEDBACK_DATASET_REPO: | |
| return "❌ HF_FEEDBACK_DATASET_REPO not configured." | |
| # Load dataset | |
| dataset = load_dataset(HF_FEEDBACK_DATASET_REPO, split="train") | |
| original_count = len(dataset) | |
| if original_count == 0: | |
| return "❌ Dataset is empty. Nothing to delete." | |
| # First, find matching entries (exact or prefix match) | |
| search_term = uuid_to_delete.lower() | |
| matching_entries = [] | |
| for entry in dataset: | |
| entry_uuid = entry.get("uuid", "") | |
| if entry_uuid.lower() == search_term or entry_uuid.lower().startswith(search_term): | |
| matching_entries.append(entry_uuid) | |
| if len(matching_entries) == 0: | |
| return f"❌ No UUID matching '{uuid_to_delete}' found in dataset." | |
| elif len(matching_entries) > 1: | |
| matches_display = ", ".join([u[:12] + "..." for u in matching_entries[:5]]) | |
| if len(matching_entries) > 5: | |
| matches_display += f" (+{len(matching_entries) - 5} more)" | |
| return f"❌ Multiple UUIDs match '{uuid_to_delete}': {matches_display}. Please be more specific." | |
| # Exactly one match - use the full UUID | |
| uuid_to_delete_full = matching_entries[0] | |
| # Filter entries to keep (exclude the matched UUID) | |
| entries_to_keep = [] | |
| for entry in dataset: | |
| entry_uuid = entry.get("uuid", "") | |
| if entry_uuid == uuid_to_delete_full: | |
| continue | |
| else: | |
| entries_to_keep.append(dict(entry)) | |
| # Handle empty dataset case | |
| if len(entries_to_keep) == 0: | |
| # Create placeholder entry to avoid empty dataset issues | |
| import uuid as uuid_module | |
| placeholder = {} | |
| deleted_entry = next(entry for entry in dataset if entry.get("uuid", "") == uuid_to_delete_full) | |
| for key in dataset.features.keys(): | |
| if key == "timestamp": | |
| placeholder[key] = datetime.now().isoformat() | |
| elif key == "rating": | |
| placeholder[key] = 0 | |
| elif key == "feedback": | |
| placeholder[key] = "[PLACEHOLDER - This entry can be deleted]" | |
| elif key in ["alpha_start", "alpha_end"]: | |
| placeholder[key] = 0.0 | |
| elif key == "n_steps": | |
| placeholder[key] = 0 | |
| elif key in ["input1", "input2"]: | |
| placeholder[key] = deleted_entry.get(key) | |
| elif key in ["extra_images", "negative_images", "blending_results", "blending_result"]: | |
| placeholder[key] = [] | |
| elif key == "uuid": | |
| placeholder[key] = str(uuid_module.uuid4()) | |
| else: | |
| placeholder[key] = deleted_entry.get(key, "") | |
| new_dataset = Dataset.from_list([placeholder], features=dataset.features) | |
| new_dataset.push_to_hub( | |
| HF_FEEDBACK_DATASET_REPO, | |
| private=True, | |
| token=HF_TOKEN | |
| ) | |
| return f"✅ Deleted entry with UUID '{uuid_to_delete_full}'. Dataset now has 1 placeholder entry." | |
| # Create new dataset with remaining entries | |
| new_dataset = Dataset.from_list(entries_to_keep, features=dataset.features) | |
| # Push to hub | |
| new_dataset.push_to_hub( | |
| HF_FEEDBACK_DATASET_REPO, | |
| private=True, | |
| token=HF_TOKEN | |
| ) | |
| return f"✅ Successfully deleted entry with UUID '{uuid_to_delete_full}'. {len(entries_to_keep)} entries remaining." | |
| except Exception as e: | |
| logging.error(f"Error deleting entry: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return f"❌ Error deleting entry: {str(e)}" | |
| delete_button.click( | |
| delete_entry_by_uuid, | |
| inputs=[delete_uuid_input, admin_password_input], | |
| outputs=[delete_status] | |
| ) | |
| def parse_timestamp(timestamp_str): | |
| """Parse timestamp string to datetime object.""" | |
| if not timestamp_str or not timestamp_str.strip(): | |
| return None | |
| timestamp_str = timestamp_str.strip() | |
| # Try different formats | |
| formats = [ | |
| "%Y-%m-%dT%H:%M:%S", | |
| "%Y-%m-%d %H:%M:%S", | |
| "%Y-%m-%dT%H:%M:%S.%f", | |
| "%Y-%m-%d %H:%M:%S.%f", | |
| "%Y-%m-%d", | |
| ] | |
| for fmt in formats: | |
| try: | |
| return datetime.strptime(timestamp_str, fmt) | |
| except ValueError: | |
| continue | |
| # Try ISO format parsing | |
| try: | |
| if timestamp_str.endswith("Z"): | |
| timestamp_str = timestamp_str[:-1] + "+00:00" | |
| return datetime.fromisoformat(timestamp_str) | |
| except (ValueError, AttributeError): | |
| pass | |
| return None | |
| def load_and_display_feedback(items_per_page, page, sort_order, uuid_search="", timestamp_start="", timestamp_end="", rating_filter="All", filter_extra=False, filter_negative=False, include_private=False): | |
| """Load feedback from dataset and format as HTML table with pagination.""" | |
| # Convert radio selection to reverse boolean | |
| sort_reverse = (sort_order == "New to Old") | |
| # Load all feedback entries (no limit, we'll paginate ourselves) | |
| # If include_private is True, set public_only to False | |
| all_feedbacks = load_feedback_from_hf_dataset(reverse=sort_reverse, public_only=not include_private) | |
| # Filter by UUID if search term provided | |
| if uuid_search and uuid_search.strip(): | |
| search_term = uuid_search.strip().lower() | |
| all_feedbacks = [ | |
| fb for fb in all_feedbacks | |
| if fb.get("uuid", "").lower().startswith(search_term) | |
| ] | |
| # Filter by timestamp range if provided | |
| start_dt = parse_timestamp(timestamp_start) | |
| end_dt = parse_timestamp(timestamp_end) | |
| if start_dt or end_dt: | |
| filtered_feedbacks = [] | |
| for fb in all_feedbacks: | |
| fb_timestamp_str = fb.get("timestamp", "") | |
| if not fb_timestamp_str: | |
| continue | |
| fb_dt = parse_timestamp(fb_timestamp_str) | |
| if not fb_dt: | |
| continue | |
| # Check if timestamp is within range | |
| if start_dt and fb_dt < start_dt: | |
| continue | |
| if end_dt and fb_dt > end_dt: | |
| continue | |
| filtered_feedbacks.append(fb) | |
| all_feedbacks = filtered_feedbacks | |
| # Filter by rating if specified | |
| if rating_filter and rating_filter != "All": | |
| try: | |
| rating_value = int(rating_filter) | |
| all_feedbacks = [ | |
| fb for fb in all_feedbacks | |
| if fb.get("rating", 0) == rating_value | |
| ] | |
| except (ValueError, TypeError): | |
| pass # Invalid rating filter, ignore it | |
| # Filter by extra images if checkbox is checked | |
| if filter_extra: | |
| all_feedbacks = [ | |
| fb for fb in all_feedbacks | |
| if fb.get("extra_images") and len(fb.get("extra_images", [])) > 0 | |
| ] | |
| # Filter by negative images if checkbox is checked | |
| if filter_negative: | |
| all_feedbacks = [ | |
| fb for fb in all_feedbacks | |
| if fb.get("negative_images") and len(fb.get("negative_images", [])) > 0 | |
| ] | |
| if not all_feedbacks: | |
| if uuid_search and uuid_search.strip(): | |
| gr.Info(f"No feedback entries found matching UUID: {uuid_search}") | |
| elif start_dt or end_dt: | |
| gr.Info("No feedback entries found in the specified timestamp range.") | |
| elif rating_filter and rating_filter != "All": | |
| gr.Info(f"No feedback entries found with rating: {rating_filter}") | |
| else: | |
| gr.Info("No feedback entries found. Make sure HF_FEEDBACK_DATASET_REPO is configured.") | |
| return "<p>No feedback entries found.</p>", 1, "**Total Pages:** 0" | |
| # Calculate pagination | |
| total_items = len(all_feedbacks) | |
| items_per_page = max(1, int(items_per_page)) | |
| total_pages = max(1, (total_items + items_per_page - 1) // items_per_page) | |
| page = max(1, min(int(page), total_pages)) | |
| # Get the slice of feedbacks for current page | |
| start_idx = (page - 1) * items_per_page | |
| end_idx = start_idx + items_per_page | |
| feedbacks = all_feedbacks[start_idx:end_idx] | |
| # Calculate global index offset for display | |
| global_start_idx = start_idx | |
| # Start building HTML table | |
| html_parts = [""" | |
| <style> | |
| .feedback-table { | |
| width: 100%; | |
| border-collapse: collapse; | |
| margin: 20px 0; | |
| font-family: Arial, sans-serif; | |
| table-layout: fixed; | |
| } | |
| .feedback-table th { | |
| background-color: #4a5568; | |
| color: white; | |
| padding: 12px; | |
| text-align: left; | |
| border: 1px solid #2d3748; | |
| font-weight: bold; | |
| } | |
| .feedback-table th:nth-child(1) { | |
| width: 20%; | |
| } | |
| .feedback-table th:nth-child(2) { | |
| width: 35%; | |
| } | |
| .feedback-table th:nth-child(3) { | |
| width: 15%; | |
| } | |
| .feedback-table th:nth-child(4) { | |
| width: 15%; | |
| } | |
| .feedback-table th:nth-child(5) { | |
| width: 15%; | |
| } | |
| .feedback-table td { | |
| padding: 10px; | |
| border: 1px solid #e2e8f0; | |
| vertical-align: top; | |
| } | |
| .feedback-table tr:nth-child(even) { | |
| background-color: #f7fafc; | |
| } | |
| .feedback-table tr:hover { | |
| background-color: #edf2f7; | |
| } | |
| .input-images { | |
| display: flex; | |
| flex-direction: column; | |
| gap: 10px; | |
| align-items: center; | |
| } | |
| .input-images img { | |
| max-width: 200px; | |
| max-height: 200px; | |
| border: 2px solid #cbd5e0; | |
| border-radius: 4px; | |
| } | |
| .result-image img { | |
| max-width: 100%; | |
| max-height: 500px; | |
| border: 2px solid #48bb78; | |
| border-radius: 4px; | |
| } | |
| .options-cell { | |
| font-size: 0.85em; | |
| line-height: 1.5; | |
| } | |
| .options-cell strong { | |
| color: #2d3748; | |
| } | |
| .image-gallery { | |
| display: flex; | |
| flex-wrap: wrap; | |
| gap: 8px; | |
| justify-content: center; | |
| } | |
| .image-gallery img { | |
| max-width: 100px; | |
| max-height: 100px; | |
| border: 1px solid #cbd5e0; | |
| border-radius: 4px; | |
| } | |
| .no-image { | |
| color: #a0aec0; | |
| font-style: italic; | |
| text-align: center; | |
| padding: 20px; | |
| } | |
| </style> | |
| <table class="feedback-table"> | |
| <thead> | |
| <tr> | |
| <th>Input Images</th> | |
| <th>Result Image</th> | |
| <th>Info</th> | |
| <th>Extra Images</th> | |
| <th>Negative Images</th> | |
| </tr> | |
| </thead> | |
| <tbody> | |
| """] | |
| for local_idx, feedback in enumerate(feedbacks): | |
| # Calculate global index for display | |
| global_idx = global_start_idx + local_idx | |
| uuid_value = feedback.get("uuid", "N/A") | |
| timestamp = feedback.get("timestamp", "Unknown") | |
| # Format timestamp for display | |
| if len(timestamp) > 19: | |
| timestamp = timestamp[:19].replace("T", " ") | |
| rating = str(feedback.get("rating", "N/A")) | |
| feedback_text = feedback.get("feedback", "") or "" | |
| alpha_start = str(feedback.get("alpha_start", "N/A")) | |
| alpha_end = str(feedback.get("alpha_end", "N/A")) | |
| n_steps = str(feedback.get("n_steps", "N/A")) | |
| # Convert images from dataset format to PIL Images | |
| input1_img = convert_dataset_image_to_pil(feedback.get("input1")) | |
| input2_img = convert_dataset_image_to_pil(feedback.get("input2")) | |
| # Get blending results list (new format) or single result (old format) | |
| blending_results_raw = feedback.get("blending_results", []) | |
| blending_results_list = [] | |
| if blending_results_raw: | |
| for img in blending_results_raw: | |
| converted = convert_dataset_image_to_pil(img) | |
| if converted: | |
| blending_results_list.append(converted) | |
| if not blending_results_list: | |
| # Backward compatibility: check for old single image format | |
| old_result = convert_dataset_image_to_pil(feedback.get("blending_result")) | |
| if old_result: | |
| blending_results_list = [old_result] | |
| # Convert list of images | |
| extra_imgs = [] | |
| if feedback.get("extra_images"): | |
| for img in feedback.get("extra_images", []): | |
| converted = convert_dataset_image_to_pil(img) | |
| if converted: | |
| extra_imgs.append(converted) | |
| negative_imgs = [] | |
| if feedback.get("negative_images"): | |
| for img in feedback.get("negative_images", []): | |
| converted = convert_dataset_image_to_pil(img) | |
| if converted: | |
| negative_imgs.append(converted) | |
| # Column 1: Input Images | |
| input_images_html = '<div class="input-images">' | |
| if input1_img: | |
| input1_base64 = pil_image_to_base64(input1_img, max_size=512) | |
| input_images_html += f'<img src="{input1_base64}" alt="Input 1" title="Input 1" />' | |
| else: | |
| input_images_html += '<div class="no-image">No Input 1</div>' | |
| if input2_img: | |
| input2_base64 = pil_image_to_base64(input2_img, max_size=512) | |
| input_images_html += f'<img src="{input2_base64}" alt="Input 2" title="Input 2" />' | |
| else: | |
| input_images_html += '<div class="no-image">No Input 2</div>' | |
| input_images_html += '</div>' | |
| # Column 2: Result Image - create grid from list of images | |
| if blending_results_list and len(blending_results_list) > 0: | |
| # Create grid from list of blending result images | |
| n_images = len(blending_results_list) | |
| cols = min(4, n_images) | |
| rows = math.ceil(n_images / cols) | |
| blending_result_grid = create_image_grid(blending_results_list, rows=rows, cols=cols) | |
| result_base64 = pil_image_to_base64(blending_result_grid, max_size=99999) | |
| result_html = f'<div class="result-image"><img src="{result_base64}" alt="Result" title="Blending Result ({n_images} images)" /></div>' | |
| else: | |
| result_html = '<div class="no-image">No Result</div>' | |
| # Column 3: Info | |
| uuid_display = uuid_value[:8] + "..." if len(uuid_value) > 12 else uuid_value | |
| options_html = f''' | |
| <div class="options-cell"> | |
| <strong>Timestamp:</strong> {timestamp}<br/> | |
| <strong>UUID:</strong> <code onclick="navigator.clipboard.writeText('{uuid_value}').then(() => this.style.backgroundColor='#90EE90').catch(() => alert('Copy failed')); setTimeout(() => this.style.backgroundColor='#f0f0f0', 500);" style="font-size: 0.8em; background-color: #f0f0f0; padding: 2px 4px; border-radius: 3px; cursor: pointer;" title="Click to copy: {uuid_value}">{uuid_display}</code><br/> | |
| <strong>Rating:</strong> {rating}/5<br/> | |
| <strong>Alpha:</strong> {alpha_start} → {alpha_end}<br/> | |
| <strong>Steps:</strong> {n_steps}<br/> | |
| <strong>Feedback:</strong> {feedback_text if feedback_text else "None"} | |
| </div> | |
| ''' | |
| # Column 4: Extra Images | |
| if extra_imgs: | |
| extra_html = '<div class="image-gallery">' | |
| for img in extra_imgs: | |
| img_base64 = pil_image_to_base64(img, max_size=512) | |
| extra_html += f'<img src="{img_base64}" alt="Extra Image" />' | |
| extra_html += '</div>' | |
| else: | |
| extra_html = '<div class="no-image">None</div>' | |
| # Column 5: Negative Images | |
| if negative_imgs: | |
| negative_html = '<div class="image-gallery">' | |
| for img in negative_imgs: | |
| img_base64 = pil_image_to_base64(img, max_size=512) | |
| negative_html += f'<img src="{img_base64}" alt="Negative Image" />' | |
| negative_html += '</div>' | |
| else: | |
| negative_html = '<div class="no-image">None</div>' | |
| # Add row | |
| html_parts.append(f""" | |
| <tr> | |
| <td>{input_images_html}</td> | |
| <td>{result_html}</td> | |
| <td>{options_html}</td> | |
| <td>{extra_html}</td> | |
| <td>{negative_html}</td> | |
| </tr> | |
| """) | |
| html_parts.append("</tbody></table>") | |
| # Add pagination info | |
| pagination_info = f""" | |
| <div style="margin-top: 20px; padding: 15px; background-color: #f7fafc; border-radius: 5px; text-align: center;"> | |
| <strong>Showing entries {start_idx + 1}-{min(end_idx, total_items)} of {total_items}</strong><br/> | |
| <span style="color: #718096;">Page {page} of {total_pages}</span> | |
| </div> | |
| """ | |
| html_parts.append(pagination_info) | |
| html_content = "".join(html_parts) | |
| gr.Info(f"Loaded {len(feedbacks)} feedback entries (page {page}/{total_pages})") | |
| return html_content, page, f"**Total Pages:** {total_pages}" | |
| # Refresh button - loads first page | |
| def refresh_feedback(items_per_page, sort_order, uuid_search, timestamp_start, timestamp_end, rating_filter, filter_extra, filter_negative, include_private): | |
| return load_and_display_feedback(items_per_page, 1, sort_order, uuid_search, timestamp_start, timestamp_end, rating_filter, filter_extra, filter_negative, include_private) | |
| refresh_button.click( | |
| refresh_feedback, | |
| inputs=[items_per_page_slider, sort_order_radio, uuid_search_input, timestamp_start_input, timestamp_end_input, rating_filter, filter_extra_images, filter_negative_images, include_private_checkbox], | |
| outputs=[feedback_html, page_number, total_pages_display] | |
| ) | |
| # Sort order radio - reset to page 1 when sort order changes | |
| def on_sort_order_change(items_per_page, sort_order, uuid_search, timestamp_start, timestamp_end, rating_filter, filter_extra, filter_negative, include_private): | |
| # Reset to page 1 when sort order changes | |
| return load_and_display_feedback(items_per_page, 1, sort_order, uuid_search, timestamp_start, timestamp_end, rating_filter, filter_extra, filter_negative, include_private) | |
| sort_order_radio.change( | |
| on_sort_order_change, | |
| inputs=[items_per_page_slider, sort_order_radio, uuid_search_input, timestamp_start_input, timestamp_end_input, rating_filter, filter_extra_images, filter_negative_images, include_private_checkbox], | |
| outputs=[feedback_html, page_number, total_pages_display] | |
| ) | |
| # Search button | |
| def on_search(items_per_page, sort_order, uuid_search, timestamp_start, timestamp_end, rating_filter, filter_extra, filter_negative, include_private): | |
| return load_and_display_feedback(items_per_page, 1, sort_order, uuid_search, timestamp_start, timestamp_end, rating_filter, filter_extra, filter_negative, include_private) | |
| search_button.click( | |
| on_search, | |
| inputs=[items_per_page_slider, sort_order_radio, uuid_search_input, timestamp_start_input, timestamp_end_input, rating_filter, filter_extra_images, filter_negative_images, include_private_checkbox], | |
| outputs=[feedback_html, page_number, total_pages_display] | |
| ) | |
| # Rating filter change | |
| rating_filter.change( | |
| on_search, | |
| inputs=[items_per_page_slider, sort_order_radio, uuid_search_input, timestamp_start_input, timestamp_end_input, rating_filter, filter_extra_images, filter_negative_images, include_private_checkbox], | |
| outputs=[feedback_html, page_number, total_pages_display] | |
| ) | |
| # Checkbox changes | |
| filter_extra_images.change( | |
| on_search, | |
| inputs=[items_per_page_slider, sort_order_radio, uuid_search_input, timestamp_start_input, timestamp_end_input, rating_filter, filter_extra_images, filter_negative_images, include_private_checkbox], | |
| outputs=[feedback_html, page_number, total_pages_display] | |
| ) | |
| filter_negative_images.change( | |
| on_search, | |
| inputs=[items_per_page_slider, sort_order_radio, uuid_search_input, timestamp_start_input, timestamp_end_input, rating_filter, filter_extra_images, filter_negative_images, include_private_checkbox], | |
| outputs=[feedback_html, page_number, total_pages_display] | |
| ) | |
| # Include private checkbox change - refresh when toggled | |
| include_private_checkbox.change( | |
| on_search, | |
| inputs=[items_per_page_slider, sort_order_radio, uuid_search_input, timestamp_start_input, timestamp_end_input, rating_filter, filter_extra_images, filter_negative_images, include_private_checkbox], | |
| outputs=[feedback_html, page_number, total_pages_display] | |
| ) | |
| # Pagination controls | |
| def on_page_change(items_per_page, page, sort_order, uuid_search, timestamp_start, timestamp_end, rating_filter, filter_extra, filter_negative, include_private): | |
| return load_and_display_feedback(items_per_page, page, sort_order, uuid_search, timestamp_start, timestamp_end, rating_filter, filter_extra, filter_negative, include_private) | |
| def on_items_per_page_change(items_per_page, sort_order, uuid_search, timestamp_start, timestamp_end, rating_filter, filter_extra, filter_negative, include_private): | |
| return load_and_display_feedback(items_per_page, 1, sort_order, uuid_search, timestamp_start, timestamp_end, rating_filter, filter_extra, filter_negative, include_private) | |
| page_number.change( | |
| on_page_change, | |
| inputs=[items_per_page_slider, page_number, sort_order_radio, uuid_search_input, timestamp_start_input, timestamp_end_input, rating_filter, filter_extra_images, filter_negative_images, include_private_checkbox], | |
| outputs=[feedback_html, page_number, total_pages_display] | |
| ) | |
| items_per_page_slider.change( | |
| on_items_per_page_change, | |
| inputs=[items_per_page_slider, sort_order_radio, uuid_search_input, timestamp_start_input, timestamp_end_input, rating_filter, filter_extra_images, filter_negative_images, include_private_checkbox], | |
| outputs=[feedback_html, page_number, total_pages_display] | |
| ) | |
| # Previous/Next page buttons | |
| def go_to_previous_page(items_per_page, current_page, sort_order, uuid_search, timestamp_start, timestamp_end, rating_filter, filter_extra, filter_negative, include_private): | |
| new_page = max(1, int(current_page) - 1) | |
| return load_and_display_feedback(items_per_page, new_page, sort_order, uuid_search, timestamp_start, timestamp_end, rating_filter, filter_extra, filter_negative, include_private) | |
| def go_to_next_page(items_per_page, current_page, sort_order, uuid_search, timestamp_start, timestamp_end, rating_filter, filter_extra, filter_negative, include_private): | |
| # Convert radio selection to reverse boolean | |
| sort_reverse = (sort_order == "New to Old") | |
| # Load all feedbacks to calculate total pages (with filters if applicable) | |
| all_feedbacks = load_feedback_from_hf_dataset(reverse=sort_reverse, public_only=not include_private) | |
| # Apply UUID filter if search term provided | |
| if uuid_search and uuid_search.strip(): | |
| search_term = uuid_search.strip().lower() | |
| all_feedbacks = [ | |
| fb for fb in all_feedbacks | |
| if fb.get("uuid", "").lower().startswith(search_term) | |
| ] | |
| # Apply timestamp range filter | |
| start_dt = parse_timestamp(timestamp_start) | |
| end_dt = parse_timestamp(timestamp_end) | |
| if start_dt or end_dt: | |
| filtered_feedbacks = [] | |
| for fb in all_feedbacks: | |
| fb_timestamp_str = fb.get("timestamp", "") | |
| if not fb_timestamp_str: | |
| continue | |
| fb_dt = parse_timestamp(fb_timestamp_str) | |
| if not fb_dt: | |
| continue | |
| if start_dt and fb_dt < start_dt: | |
| continue | |
| if end_dt and fb_dt > end_dt: | |
| continue | |
| filtered_feedbacks.append(fb) | |
| all_feedbacks = filtered_feedbacks | |
| # Apply rating filter if specified | |
| if rating_filter and rating_filter != "All": | |
| try: | |
| rating_value = int(rating_filter) | |
| all_feedbacks = [ | |
| fb for fb in all_feedbacks | |
| if fb.get("rating", 0) == rating_value | |
| ] | |
| except (ValueError, TypeError): | |
| pass # Invalid rating filter, ignore it | |
| # Apply extra images filter if checkbox is checked | |
| if filter_extra: | |
| all_feedbacks = [ | |
| fb for fb in all_feedbacks | |
| if fb.get("extra_images") and len(fb.get("extra_images", [])) > 0 | |
| ] | |
| # Apply negative images filter if checkbox is checked | |
| if filter_negative: | |
| all_feedbacks = [ | |
| fb for fb in all_feedbacks | |
| if fb.get("negative_images") and len(fb.get("negative_images", [])) > 0 | |
| ] | |
| if not all_feedbacks: | |
| return load_and_display_feedback(items_per_page, 1, sort_order, uuid_search, timestamp_start, timestamp_end, rating_filter, filter_extra, filter_negative, include_private) | |
| total_items = len(all_feedbacks) | |
| items_per_page = max(1, int(items_per_page)) | |
| total_pages = max(1, (total_items + items_per_page - 1) // items_per_page) | |
| new_page = min(total_pages, int(current_page) + 1) | |
| return load_and_display_feedback(items_per_page, new_page, sort_order, uuid_search, timestamp_start, timestamp_end, rating_filter, filter_extra, filter_negative, include_private) | |
| prev_page_button.click( | |
| go_to_previous_page, | |
| inputs=[items_per_page_slider, page_number, sort_order_radio, uuid_search_input, timestamp_start_input, timestamp_end_input, rating_filter, filter_extra_images, filter_negative_images, include_private_checkbox], | |
| outputs=[feedback_html, page_number, total_pages_display] | |
| ) | |
| next_page_button.click( | |
| go_to_next_page, | |
| inputs=[items_per_page_slider, page_number, sort_order_radio, uuid_search_input, timestamp_start_input, timestamp_end_input, rating_filter, filter_extra_images, filter_negative_images, include_private_checkbox], | |
| outputs=[feedback_html, page_number, total_pages_display] | |
| ) | |
| # Allow Enter key to trigger search | |
| uuid_search_input.submit( | |
| on_search, | |
| inputs=[items_per_page_slider, sort_order_radio, uuid_search_input, timestamp_start_input, timestamp_end_input, rating_filter, filter_extra_images, filter_negative_images, include_private_checkbox], | |
| outputs=[feedback_html, page_number, total_pages_display] | |
| ) | |
| timestamp_start_input.submit( | |
| on_search, | |
| inputs=[items_per_page_slider, sort_order_radio, uuid_search_input, timestamp_start_input, timestamp_end_input, rating_filter, filter_extra_images, filter_negative_images, include_private_checkbox], | |
| outputs=[feedback_html, page_number, total_pages_display] | |
| ) | |
| timestamp_end_input.submit( | |
| on_search, | |
| inputs=[items_per_page_slider, sort_order_radio, uuid_search_input, timestamp_start_input, timestamp_end_input, rating_filter, filter_extra_images, filter_negative_images, include_private_checkbox], | |
| outputs=[feedback_html, page_number, total_pages_display] | |
| ) | |
| if __name__ == "__main__": | |
| """Run the feedback viewer as a standalone application.""" | |
| logging.basicConfig(level=logging.INFO) | |
| # Create a Gradio interface with just the feedback viewer tab | |
| demo = gr.Blocks() | |
| with demo: | |
| create_feedback_viewer_tab() | |
| # Launch the demo | |
| demo.launch( | |
| share=True, | |
| show_error=True | |
| ) | |