Spaces:
Running on Zero
Running on Zero
| """ | |
| HistoryManager - Centralized image history and metadata management. | |
| This module provides a unified API for managing generation history across | |
| both Streamlit and Gradio UIs with features including: | |
| - Type-safe HistoryEntry dataclass | |
| - In-memory caching with invalidation | |
| - Deduplication by image path | |
| - Backup rotation for corrupt JSON recovery | |
| - Search and filtering capabilities | |
| """ | |
| import os | |
| import re | |
| import json | |
| import glob | |
| import time | |
| import shutil | |
| from dataclasses import dataclass, field, asdict | |
| from typing import Optional, List, Dict, Any, Callable | |
| from PIL import Image | |
| # Configuration | |
| HISTORY_FILE = "./webui_history.json" | |
| BACKUP_DIR = "./.history_backups" | |
| MAX_HISTORY_ENTRIES = 100 | |
| MAX_BACKUPS = 3 | |
| class HistoryEntry: | |
| """Type-safe representation of a history entry.""" | |
| timestamp: str | |
| image_path: str | |
| prompt: str = "" | |
| negative_prompt: str = "" | |
| width: Optional[int] = None | |
| height: Optional[int] = None | |
| batch_size: Optional[int] = None | |
| model_type: Optional[str] = None | |
| model_path: Optional[str] = None | |
| seed: Optional[str] = None | |
| sampler: Optional[str] = None | |
| steps: Optional[int] = None | |
| generation_duration: Optional[float] = None | |
| avg_iters_per_s: Optional[float] = None | |
| cfg: Optional[float] = None | |
| scheduler: Optional[str] = None | |
| denoise: Optional[float] = None | |
| png_metadata: Dict[str, Any] = field(default_factory=dict) | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert entry to dictionary for JSON serialization.""" | |
| return asdict(self) | |
| def from_dict(cls, data: Dict[str, Any]) -> "HistoryEntry": | |
| """Create entry from dictionary, handling missing fields gracefully.""" | |
| # Filter to only valid fields | |
| valid_fields = {f.name for f in cls.__dataclass_fields__.values()} | |
| filtered = {k: v for k, v in data.items() if k in valid_fields} | |
| return cls(**filtered) | |
| def sanitize_seed_for_display(seed_value: Any) -> Optional[str]: | |
| """ | |
| Return a safe seed string or None if the value looks like a tensor/image dump. | |
| Handles various edge cases: | |
| - Numeric values (int/float) | |
| - String representations with tensor dumps | |
| - Very long strings that indicate binary/array data | |
| - Extracts numeric tokens from mixed content | |
| """ | |
| if seed_value is None: | |
| return None | |
| if isinstance(seed_value, (int, float)): | |
| return str(int(seed_value)) | |
| if isinstance(seed_value, str): | |
| s = seed_value.strip() | |
| # Detect and reject tensor/array dumps | |
| if any(pattern in s.lower() for pattern in ["tensor(", "array(", "[[", "]]"]): | |
| # Try to extract numeric token | |
| m = re.search(r"(\d{4,})", s) | |
| return m.group(0) if m else None | |
| # Reject multiline or excessively long strings | |
| if "\n" in s or len(s) > 240: | |
| m = re.search(r"(\d{4,})", s) | |
| return m.group(0) if m else None | |
| # Reject bracket-heavy content (likely JSON/list dumps) | |
| if s.count("[") > 2 or s.count("{") > 2: | |
| m = re.search(r"(\d{4,})", s) | |
| return m.group(0) if m else None | |
| # Reject array-like content (starts with [ and contains commas) | |
| if s.startswith("[") and "," in s: | |
| m = re.search(r"(\d{4,})", s) | |
| return m.group(0) if m else None | |
| return s if s else None | |
| return None | |
| def _parse_float_safe(value: Any) -> Optional[float]: | |
| """Safely parse a float value, handling string suffixes like 's'.""" | |
| if value is None: | |
| return None | |
| try: | |
| return float(value) | |
| except (ValueError, TypeError): | |
| try: | |
| return float(str(value).rstrip('s')) | |
| except (ValueError, TypeError): | |
| return None | |
| def _parse_int_safe(value: Any) -> Optional[int]: | |
| """Safely parse an integer value.""" | |
| if value is None: | |
| return None | |
| try: | |
| if isinstance(value, str) and value.isdigit(): | |
| return int(value) | |
| return int(value) | |
| except (ValueError, TypeError): | |
| return None | |
| class HistoryManager: | |
| """ | |
| Centralized manager for image generation history. | |
| Features: | |
| - In-memory caching for fast access | |
| - Automatic deduplication by image path | |
| - Backup rotation for data safety | |
| - Search and filter capabilities | |
| """ | |
| def __init__(self, history_file: str = HISTORY_FILE): | |
| self.history_file = history_file | |
| self._cache: Optional[List[HistoryEntry]] = None | |
| self._cache_mtime: float = 0 | |
| def _create_backup(self) -> None: | |
| """Create a backup of the current history file.""" | |
| if not os.path.exists(self.history_file): | |
| return | |
| os.makedirs(BACKUP_DIR, exist_ok=True) | |
| # Rotate existing backups | |
| for i in range(MAX_BACKUPS - 1, 0, -1): | |
| old_backup = os.path.join(BACKUP_DIR, f"history_backup_{i}.json") | |
| new_backup = os.path.join(BACKUP_DIR, f"history_backup_{i + 1}.json") | |
| if os.path.exists(old_backup): | |
| if i + 1 > MAX_BACKUPS: | |
| os.remove(old_backup) | |
| else: | |
| shutil.move(old_backup, new_backup) | |
| # Create new backup | |
| backup_path = os.path.join(BACKUP_DIR, "history_backup_1.json") | |
| try: | |
| shutil.copy2(self.history_file, backup_path) | |
| except Exception: | |
| pass # Best effort backup | |
| def _restore_from_backup(self) -> List[Dict[str, Any]]: | |
| """Attempt to restore history from the most recent valid backup.""" | |
| if not os.path.exists(BACKUP_DIR): | |
| return [] | |
| for i in range(1, MAX_BACKUPS + 1): | |
| backup_path = os.path.join(BACKUP_DIR, f"history_backup_{i}.json") | |
| if os.path.exists(backup_path): | |
| try: | |
| with open(backup_path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| if isinstance(data, list): | |
| # Restore successful - copy backup to main file | |
| shutil.copy2(backup_path, self.history_file) | |
| return data | |
| except Exception: | |
| continue | |
| return [] | |
| def _invalidate_cache(self) -> None: | |
| """Invalidate the in-memory cache.""" | |
| self._cache = None | |
| self._cache_mtime = 0 | |
| def _is_cache_valid(self) -> bool: | |
| """Check if cache is still valid based on file modification time.""" | |
| if self._cache is None: | |
| return False | |
| try: | |
| current_mtime = os.path.getmtime(self.history_file) | |
| return current_mtime == self._cache_mtime | |
| except OSError: | |
| return False | |
| def load(self, use_cache: bool = True) -> List[HistoryEntry]: | |
| """ | |
| Load history entries from disk with caching. | |
| Args: | |
| use_cache: If True, return cached data if available and valid. | |
| Returns: | |
| List of HistoryEntry objects, deduplicated by image_path. | |
| """ | |
| if use_cache and self._is_cache_valid(): | |
| return self._cache | |
| raw_data = [] | |
| if os.path.exists(self.history_file): | |
| try: | |
| with open(self.history_file, "r", encoding="utf-8") as f: | |
| raw_data = json.load(f) | |
| except json.JSONDecodeError: | |
| # Attempt restore from backup | |
| raw_data = self._restore_from_backup() | |
| except Exception: | |
| raw_data = [] | |
| # Convert to HistoryEntry and deduplicate | |
| seen_paths = set() | |
| entries = [] | |
| for item in raw_data: | |
| if not isinstance(item, dict): | |
| continue | |
| path = item.get("image_path") | |
| if path and path in seen_paths: | |
| continue # Skip duplicate | |
| if path: | |
| seen_paths.add(path) | |
| # Normalize fields | |
| entry = self._normalize_entry(item) | |
| entries.append(entry) | |
| # Enforce max limit | |
| entries = entries[:MAX_HISTORY_ENTRIES] | |
| # Update cache | |
| self._cache = entries | |
| try: | |
| self._cache_mtime = os.path.getmtime(self.history_file) | |
| except OSError: | |
| self._cache_mtime = 0 | |
| return entries | |
| def _normalize_entry(self, data: Dict[str, Any]) -> HistoryEntry: | |
| """Normalize a raw dictionary into a HistoryEntry with sanitized fields.""" | |
| png_meta = data.get("png_metadata") or {} | |
| # Normalize seed | |
| seed = sanitize_seed_for_display(data.get("seed")) | |
| if not seed and isinstance(png_meta, dict): | |
| seed = sanitize_seed_for_display(png_meta.get("seed")) | |
| # Normalize dimensions | |
| width = _parse_int_safe(data.get("width")) | |
| height = _parse_int_safe(data.get("height")) | |
| # Try to get dimensions from image if missing | |
| if (width is None or height is None) and data.get("image_path"): | |
| img_path = data.get("image_path") | |
| if os.path.exists(img_path): | |
| try: | |
| with Image.open(img_path) as img: | |
| width, height = img.size | |
| except Exception: | |
| pass | |
| # Normalize numeric fields | |
| steps = _parse_int_safe(data.get("steps") or png_meta.get("steps")) | |
| cfg = _parse_float_safe(data.get("cfg") or png_meta.get("cfg")) | |
| generation_duration = _parse_float_safe( | |
| data.get("generation_duration") or png_meta.get("generation_duration") | |
| ) | |
| avg_iters_per_s = _parse_float_safe( | |
| data.get("avg_iters_per_s") or png_meta.get("avg_iters_per_s") | |
| ) | |
| denoise = _parse_float_safe(data.get("denoise") or png_meta.get("denoise")) | |
| return HistoryEntry( | |
| timestamp=data.get("timestamp", ""), | |
| image_path=data.get("image_path", ""), | |
| prompt=data.get("prompt") or png_meta.get("prompt", ""), | |
| negative_prompt=data.get("negative_prompt") or png_meta.get("negative_prompt", ""), | |
| width=width, | |
| height=height, | |
| batch_size=_parse_int_safe(data.get("batch_size")), | |
| model_type=data.get("model_type") or png_meta.get("model_type"), | |
| model_path=data.get("model_path") or png_meta.get("model_path"), | |
| seed=seed, | |
| sampler=data.get("sampler") or png_meta.get("sampler"), | |
| steps=steps, | |
| generation_duration=generation_duration, | |
| avg_iters_per_s=avg_iters_per_s, | |
| cfg=cfg, | |
| scheduler=data.get("scheduler") or png_meta.get("scheduler"), | |
| denoise=denoise, | |
| png_metadata=png_meta if isinstance(png_meta, dict) else {}, | |
| ) | |
| def save(self, entries: List[HistoryEntry]) -> bool: | |
| """ | |
| Save history entries to disk with backup rotation. | |
| Args: | |
| entries: List of HistoryEntry objects to save. | |
| Returns: | |
| True if save was successful, False otherwise. | |
| """ | |
| # Create backup before overwriting | |
| self._create_backup() | |
| # Enforce limit | |
| entries = entries[:MAX_HISTORY_ENTRIES] | |
| try: | |
| data = [e.to_dict() for e in entries] | |
| with open(self.history_file, "w", encoding="utf-8") as f: | |
| json.dump(data, f, indent=2, ensure_ascii=False) | |
| # Invalidate cache to force reload | |
| self._invalidate_cache() | |
| return True | |
| except Exception: | |
| return False | |
| def add_entry(self, entry: HistoryEntry) -> bool: | |
| """ | |
| Add a new entry to the history (at the beginning). | |
| Args: | |
| entry: The HistoryEntry to add. | |
| Returns: | |
| True if successful, False otherwise. | |
| """ | |
| entries = self.load(use_cache=False) | |
| # Remove any existing entry with the same path | |
| entries = [e for e in entries if e.image_path != entry.image_path] | |
| # Insert at beginning | |
| entries.insert(0, entry) | |
| return self.save(entries) | |
| def add_from_image_paths( | |
| self, | |
| image_paths: List[str], | |
| settings: Optional[Dict[str, Any]] = None | |
| ) -> bool: | |
| """ | |
| Add entries from a list of image paths, extracting PNG metadata. | |
| Args: | |
| image_paths: List of paths to PNG images. | |
| settings: Optional settings dict to supplement PNG metadata. | |
| Returns: | |
| True if all entries were added successfully. | |
| """ | |
| settings = settings or {} | |
| entries = self.load(use_cache=False) | |
| timestamp = time.strftime("%Y-%m-%d %H:%M:%S") | |
| for img_path in image_paths: | |
| if not os.path.exists(img_path): | |
| continue | |
| # Read PNG metadata and dimensions | |
| png_meta = {} | |
| width, height = None, None | |
| try: | |
| with Image.open(img_path) as img: | |
| png_meta = getattr(img, "info", {}) or {} | |
| width, height = img.size | |
| except Exception: | |
| continue | |
| # Create normalized entry | |
| entry_data = { | |
| "timestamp": timestamp, | |
| "image_path": img_path, | |
| "prompt": png_meta.get("prompt") or settings.get("prompt", ""), | |
| "negative_prompt": png_meta.get("negative_prompt") or settings.get("negative_prompt", ""), | |
| "width": width, | |
| "height": height, | |
| "batch_size": settings.get("batch_size"), | |
| "model_type": png_meta.get("model_type"), | |
| "model_path": png_meta.get("model_path"), | |
| "seed": png_meta.get("seed"), | |
| "sampler": png_meta.get("sampler"), | |
| "steps": png_meta.get("steps"), | |
| "generation_duration": png_meta.get("generation_duration"), | |
| "avg_iters_per_s": png_meta.get("avg_iters_per_s"), | |
| "cfg": png_meta.get("cfg"), | |
| "scheduler": png_meta.get("scheduler"), | |
| "denoise": png_meta.get("denoise"), | |
| "png_metadata": png_meta, | |
| } | |
| entry = self._normalize_entry(entry_data) | |
| # Remove existing entry with same path | |
| entries = [e for e in entries if e.image_path != entry.image_path] | |
| entries.insert(0, entry) | |
| return self.save(entries) | |
| def delete_entry(self, index: int) -> bool: | |
| """ | |
| Delete an entry by index and remove the associated image file. | |
| Args: | |
| index: The index of the entry to delete. | |
| Returns: | |
| True if deletion was successful, False otherwise. | |
| """ | |
| entries = self.load(use_cache=False) | |
| if not (0 <= index < len(entries)): | |
| return False | |
| entry = entries[index] | |
| # Delete image file | |
| if entry.image_path and os.path.exists(entry.image_path): | |
| try: | |
| os.remove(entry.image_path) | |
| except Exception: | |
| pass # Continue even if file deletion fails | |
| # Remove from list | |
| entries.pop(index) | |
| return self.save(entries) | |
| def clear(self, delete_files: bool = True) -> bool: | |
| """ | |
| Clear all history entries. | |
| Args: | |
| delete_files: If True, also delete the associated image files. | |
| Returns: | |
| True if successful, False otherwise. | |
| """ | |
| if delete_files: | |
| entries = self.load(use_cache=False) | |
| for entry in entries: | |
| if entry.image_path and os.path.exists(entry.image_path): | |
| try: | |
| os.remove(entry.image_path) | |
| except Exception: | |
| pass | |
| return self.save([]) | |
| def scan_output_folders( | |
| self, | |
| output_dirs: Optional[List[str]] = None | |
| ) -> List[HistoryEntry]: | |
| """ | |
| Scan output folders for PNG images and build/update history. | |
| Args: | |
| output_dirs: List of directories to scan. Defaults to standard output dirs. | |
| Returns: | |
| Updated list of history entries. | |
| """ | |
| if output_dirs is None: | |
| output_dirs = [ | |
| "./output/Classic", | |
| "./output/HiresFix", | |
| "./output/Img2Img", | |
| "./output/Adetailer", | |
| "./output/ControlNet", | |
| "./output/Flux", | |
| ] | |
| # Collect all PNG files | |
| all_images = [] | |
| for output_dir in output_dirs: | |
| if os.path.exists(output_dir): | |
| images = glob.glob(f"{output_dir}/*.png") | |
| all_images.extend(images) | |
| # Sort by modification time (newest first) | |
| all_images = sorted(all_images, key=os.path.getmtime, reverse=True) | |
| # Get existing entries as a lookup | |
| existing = self.load(use_cache=False) | |
| existing_map = {e.image_path: e for e in existing} | |
| # Build new history preserving existing metadata | |
| new_entries = [] | |
| seen_paths = set() | |
| for img_path in all_images[:MAX_HISTORY_ENTRIES]: | |
| if img_path in seen_paths: | |
| continue | |
| seen_paths.add(img_path) | |
| if img_path in existing_map: | |
| new_entries.append(existing_map[img_path]) | |
| else: | |
| # Create new entry from image | |
| try: | |
| mtime = os.path.getmtime(img_path) | |
| timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(mtime)) | |
| with Image.open(img_path) as img: | |
| width, height = img.size | |
| png_meta = getattr(img, "info", {}) or {} | |
| entry_data = { | |
| "timestamp": timestamp, | |
| "image_path": img_path, | |
| "prompt": png_meta.get("prompt", "(prompt not available)"), | |
| "negative_prompt": png_meta.get("negative_prompt", ""), | |
| "width": width, | |
| "height": height, | |
| "png_metadata": png_meta, | |
| } | |
| new_entries.append(self._normalize_entry(entry_data)) | |
| except Exception: | |
| continue | |
| self.save(new_entries) | |
| return new_entries | |
| # ========================================================================= | |
| # Search and Filter Methods | |
| # ========================================================================= | |
| def search( | |
| self, | |
| keyword: Optional[str] = None, | |
| model_type: Optional[str] = None, | |
| date_from: Optional[str] = None, | |
| date_to: Optional[str] = None, | |
| min_width: Optional[int] = None, | |
| min_height: Optional[int] = None, | |
| ) -> List[HistoryEntry]: | |
| """ | |
| Search and filter history entries. | |
| Args: | |
| keyword: Search in prompt and negative_prompt (case-insensitive). | |
| model_type: Filter by model type (SD15, SDXL, Flux, etc.). | |
| date_from: Filter entries from this date (YYYY-MM-DD format). | |
| date_to: Filter entries until this date (YYYY-MM-DD format). | |
| min_width: Minimum image width. | |
| min_height: Minimum image height. | |
| Returns: | |
| Filtered list of HistoryEntry objects. | |
| """ | |
| entries = self.load() | |
| results = [] | |
| keyword_lower = keyword.lower() if keyword else None | |
| for entry in entries: | |
| # Keyword search | |
| if keyword_lower: | |
| prompt_match = keyword_lower in (entry.prompt or "").lower() | |
| neg_match = keyword_lower in (entry.negative_prompt or "").lower() | |
| if not (prompt_match or neg_match): | |
| continue | |
| # Model type filter | |
| if model_type and entry.model_type: | |
| if model_type.lower() not in entry.model_type.lower(): | |
| continue | |
| elif model_type and not entry.model_type: | |
| continue | |
| # Date range filter | |
| if date_from and entry.timestamp < date_from: | |
| continue | |
| if date_to and entry.timestamp > date_to + " 23:59:59": | |
| continue | |
| # Dimension filters | |
| if min_width and (entry.width is None or entry.width < min_width): | |
| continue | |
| if min_height and (entry.height is None or entry.height < min_height): | |
| continue | |
| results.append(entry) | |
| return results | |
| def get_model_types(self) -> List[str]: | |
| """Get a list of unique model types in the history.""" | |
| entries = self.load() | |
| types = {e.model_type for e in entries if e.model_type} | |
| return sorted(types) | |
| def get_date_range(self) -> tuple: | |
| """Get the date range of entries in history.""" | |
| entries = self.load() | |
| if not entries: | |
| return None, None | |
| dates = [e.timestamp[:10] for e in entries if e.timestamp] | |
| if not dates: | |
| return None, None | |
| return min(dates), max(dates) | |
| # Global singleton instance for convenience | |
| _default_manager: Optional[HistoryManager] = None | |
| def get_history_manager() -> HistoryManager: | |
| """Get the default HistoryManager singleton.""" | |
| global _default_manager | |
| if _default_manager is None: | |
| _default_manager = HistoryManager() | |
| return _default_manager | |