Spaces:
Running on Zero
Running on Zero
| from __future__ import annotations | |
| import gc | |
| import json | |
| import random | |
| import threading | |
| from pathlib import Path | |
| from typing import Dict, Tuple | |
| import numpy as np | |
| import torch | |
| from PIL import Image | |
| from .backends import FluxBilaBackend, Ip2pBilaBackend, release_cuda | |
| from .image_io import blend_strength, prepare_image, tensor_to_pil | |
| from .weights import configure_runtime_cache, require_paths, resolve_model_root | |
| MANIFEST_PATH = Path(__file__).resolve().parents[1] / "model_manifest.json" | |
| def load_manifest() -> Dict: | |
| with MANIFEST_PATH.open("r", encoding="utf-8") as handle: | |
| return json.load(handle) | |
| def seed_everything(seed: int) -> None: | |
| seed = int(seed) | |
| random.seed(seed) | |
| np.random.seed(seed % (2**32 - 1)) | |
| torch.manual_seed(seed) | |
| if torch.cuda.is_available(): | |
| torch.cuda.manual_seed_all(seed) | |
| class DemoManager: | |
| def __init__(self): | |
| configure_runtime_cache() | |
| self.manifest = load_manifest() | |
| self._backend = None | |
| self._backend_key = None | |
| self._lock = threading.Lock() | |
| def model_choices(self): | |
| return [ | |
| (cfg["label"], key) | |
| for key, cfg in self.manifest["models"].items() | |
| ] | |
| def default_model(self): | |
| return self.manifest["default_model"] | |
| def _paths_for_model(self, model_key: str) -> Dict[str, Path]: | |
| model_cfg = self.manifest["models"][model_key] | |
| root = resolve_model_root(model_key, model_cfg) | |
| require_paths(root, model_cfg["weights"].values()) | |
| return {name: root / rel for name, rel in model_cfg["weights"].items()} | |
| def _load_backend(self, model_key: str): | |
| if self._backend_key == model_key and self._backend is not None: | |
| return self._backend | |
| self._backend = None | |
| self._backend_key = None | |
| gc.collect() | |
| release_cuda() | |
| model_cfg = self.manifest["models"][model_key] | |
| paths = self._paths_for_model(model_key) | |
| if model_cfg["kind"] == "ip2p": | |
| backend = Ip2pBilaBackend(model_cfg, paths) | |
| elif model_cfg["kind"] == "flux": | |
| backend = FluxBilaBackend(model_cfg, paths) | |
| else: | |
| raise ValueError(f"Unknown model kind: {model_cfg['kind']}") | |
| self._backend = backend | |
| self._backend_key = model_key | |
| return backend | |
| def generate( | |
| self, | |
| image: Image.Image, | |
| instruction: str, | |
| model_key: str, | |
| seed: int, | |
| max_side: int, | |
| strength: float, | |
| ) -> Tuple[Image.Image, Image.Image, Image.Image, str]: | |
| if image is None: | |
| raise ValueError("Please upload an image.") | |
| instruction = (instruction or "").strip() | |
| if model_key not in self.manifest["models"]: | |
| raise ValueError(f"Unknown model: {model_key}") | |
| with self._lock: | |
| model_cfg = self.manifest["models"][model_key] | |
| model_size = int(model_cfg["config"].get("model_size", 512)) | |
| prepared = prepare_image(image, max_side=max_side, model_size=model_size) | |
| backend = self._load_backend(model_key) | |
| seed_everything(seed) | |
| result = backend( | |
| prepared.model_tensor, | |
| [instruction], | |
| prepared.full_tensor, | |
| ) | |
| edited_tensor = blend_strength(prepared.full_tensor, result["bila"], strength) | |
| edited = tensor_to_pil(edited_tensor) | |
| diff = tensor_to_pil(result["diff"]) | |
| status = f"{model_cfg['label']} | seed={int(seed)}" | |
| return edited, diff, prepared.full_pil, status | |