from __future__ import annotations import os import sys from pathlib import Path try: import spaces except ImportError: class _SpacesCompat: @staticmethod def GPU(*decorator_args, **decorator_kwargs): if decorator_args and callable(decorator_args[0]) and len(decorator_args) == 1 and not decorator_kwargs: return decorator_args[0] def decorator(fn): return fn return decorator spaces = _SpacesCompat() import gradio as gr import torch try: from huggingface_hub import snapshot_download except Exception: snapshot_download = None CURRENT_FILE = Path(__file__).resolve() PROJECT_ROOT = CURRENT_FILE.parents[1] for candidate in (CURRENT_FILE.parent, CURRENT_FILE.parents[1]): if (candidate / "infer").exists() and (candidate / "models").exists(): PROJECT_ROOT = candidate break if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from demo.real_world_pipeline import ( # noqa: E402 DEFAULT_BBOX_MODEL, DEFAULT_STAGE2_MODEL_REPO_ID, DEFAULT_REAL_CONFIG_PATH, DEFAULT_RUN_NAME, DEFAULT_WORK_DIR, run_real_world_pipeline, ) from demo.hf_repo_assets import ensure_repo_assets, get_stage2_model_repo_id # noqa: E402 DEFAULT_EXAMPLE_DIR = Path( os.environ.get( "SYNLAYERS_EXAMPLE_DIR", str(PROJECT_ROOT / "demo" / "examples"), ) ) HF_HOME = Path(os.environ.get("HF_HOME", str(Path.home() / ".cache" / "huggingface"))) HF_HOME.mkdir(parents=True, exist_ok=True) os.environ["HF_HOME"] = str(HF_HOME) os.environ.setdefault("HF_HUB_ENABLE_HF_TRANSFER", "1") def read_int_env(name: str, default: int) -> int: raw = os.environ.get(name) if raw is None: return default try: return int(raw) except ValueError: return default def clamp(value: int, low: int, high: int) -> int: return max(low, min(value, high)) ZERO_GPU_SIZE = ( os.environ.get("SYNLAYERS_ZERO_GPU_SIZE", "large").strip() or "large" ).lower() # Keep this high enough for the full pipeline after model initialization. ZERO_GPU_DURATION = clamp( read_int_env("SYNLAYERS_ZERO_GPU_DURATION", 500), 60, 500, ) MODEL_PREFETCH_STATUS = { "enabled": os.environ.get("SYNLAYERS_DISABLE_PREFETCH", "0") != "1", "stage1_bbox_model": str(DEFAULT_BBOX_MODEL), "stage2_model_repo": get_stage2_model_repo_id(), "bbox_done": False, "stage2_done": False, "error": "", } def is_hf_repo_id(path_or_repo: str | Path | None) -> bool: if path_or_repo is None: return False value = str(path_or_repo) if not value: return False # Local path. if value.startswith("/") or value.startswith("./") or value.startswith("../"): return False # HF repo id usually looks like "namespace/repo". return "/" in value and not Path(value).exists() def prefetch_one_model(repo_id_or_path: str | Path | None, label: str) -> bool: if snapshot_download is None: MODEL_PREFETCH_STATUS["error"] += ( f"\n- Cannot prefetch {label}: huggingface_hub.snapshot_download is unavailable." ) return False if not is_hf_repo_id(repo_id_or_path): return True repo_id = str(repo_id_or_path) try: snapshot_download( repo_id=repo_id, local_files_only=False, resume_download=True, allow_patterns=[ "config.json", "generation_config.json", "preprocessor_config.json", "processor_config.json", "tokenizer.json", "tokenizer_config.json", "special_tokens_map.json", "merges.txt", "vocab.json", "*.py", "*.json", "*.safetensors", "*.safetensors.index.json", "*.bin", "*.pt", ], ignore_patterns=[ ".git/*", "*.md", "*.txt", "*.png", "*.jpg", "*.jpeg", "*.webp", "*.mp4", "*.zip", "*.tar", "*.tar.gz", ], ) return True except Exception as exc: MODEL_PREFETCH_STATUS["error"] += f"\n- Failed to prefetch {label} `{repo_id}`: {exc}" return False def prefetch_model_assets() -> None: """ Download model files before the ZeroGPU function is called. This does not instantiate the models. It only ensures files are already in the Hugging Face cache, so download time is not counted inside @spaces.GPU. Model objects are cached in demo/real_world_pipeline.py after their first construction in the running process. """ if not MODEL_PREFETCH_STATUS["enabled"]: return stage1_bbox_model = ( os.environ.get("SYNLAYERS_BBOX_MODEL") or os.environ.get("SYNLAYERS_BBOX_MODEL_REPO") or DEFAULT_BBOX_MODEL ) stage2_model_repo = get_stage2_model_repo_id() MODEL_PREFETCH_STATUS["stage1_bbox_model"] = str(stage1_bbox_model) MODEL_PREFETCH_STATUS["stage2_model_repo"] = str(stage2_model_repo) bbox_ok = prefetch_one_model(stage1_bbox_model, "Stage 1 bbox model") stage2_ok = prefetch_one_model(stage2_model_repo, "Stage 2 model repo") try: ensure_repo_assets(stage2_model_repo) except Exception as exc: MODEL_PREFETCH_STATUS["error"] += ( f"\n- Failed to prefetch Stage 2 runtime assets from `{stage2_model_repo}`: {exc}" ) stage2_ok = False MODEL_PREFETCH_STATUS["bbox_done"] = bool(bbox_ok) MODEL_PREFETCH_STATUS["stage2_done"] = bool(stage2_ok) # Run prefetch during Space startup, outside the ZeroGPU-decorated function. prefetch_model_assets() def list_example_images(limit: int = 6) -> list[list[str]]: if not DEFAULT_EXAMPLE_DIR.exists(): return [] candidates = [] for ext in ("*.png", "*.jpg", "*.jpeg", "*.webp"): candidates.extend(DEFAULT_EXAMPLE_DIR.glob(ext)) candidates = sorted(candidates)[:limit] return [[str(path)] for path in candidates] def build_gallery(result: dict) -> list[tuple[str, str]]: gallery: list[tuple[str, str]] = [] if result.get("whole_image_rgba"): gallery.append((result["whole_image_rgba"], "Whole RGBA")) if result.get("background_rgba"): gallery.append((result["background_rgba"], "Background RGBA")) for idx, path in enumerate(result.get("layer_images", [])): gallery.append((path, f"Layer {idx}")) return gallery def get_gpu_name() -> str: if not torch.cuda.is_available(): return "None" try: return torch.cuda.get_device_name(torch.cuda.current_device()) except Exception as exc: return f"Unavailable ({exc})" def is_zero_gpu_space() -> bool: accelerator = os.environ.get("ACCELERATOR", "").lower() return ( os.environ.get("ZEROGPU_V2", "").lower() == "true" or os.environ.get("ZERO_GPU_PATCH_TORCH_DEVICE") == "1" or accelerator == "zerogpu" or accelerator.startswith("zero") ) def get_runtime_status_markdown() -> str: accelerator = os.environ.get("ACCELERATOR", "unknown") space_id = os.environ.get("SPACE_ID", "local") stage1_bbox_model = ( os.environ.get("SYNLAYERS_BBOX_MODEL") or os.environ.get("SYNLAYERS_BBOX_MODEL_REPO") or DEFAULT_BBOX_MODEL ) stage2_model_repo = get_stage2_model_repo_id() zero_gpu_enabled = is_zero_gpu_space() lines = [ "## Runtime Status", f"- `SPACE_ID`: `{space_id}`", f"- `ACCELERATOR`: `{accelerator}`", f"- `HF_HOME`: `{os.environ.get('HF_HOME', '')}`", f"- `Stage 1 bbox repo/path`: `{stage1_bbox_model}`", f"- `Stage 2 repo`: `{stage2_model_repo}`", "", "## Model Asset Prefetch", f"- `Prefetch enabled`: `{MODEL_PREFETCH_STATUS['enabled']}`", f"- `Stage 1 bbox model`: `{MODEL_PREFETCH_STATUS['stage1_bbox_model']}`", f"- `Stage 2 model repo`: `{MODEL_PREFETCH_STATUS['stage2_model_repo']}`", f"- `Stage 1 files prefetched`: `{MODEL_PREFETCH_STATUS['bbox_done']}`", f"- `Stage 2 assets prefetched`: `{MODEL_PREFETCH_STATUS['stage2_done']}`", ] if MODEL_PREFETCH_STATUS["error"]: lines.extend( [ "", "### Prefetch Warnings", MODEL_PREFETCH_STATUS["error"], ] ) lines.append("") if zero_gpu_enabled: lines.extend( [ "## ZeroGPU", f"- `ZeroGPU mode`: `True`", f"- `Requested GPU size`: `{ZERO_GPU_SIZE}`", f"- `Requested max duration`: `{ZERO_GPU_DURATION}` seconds", f"- `CUDA probe outside @spaces.GPU`: `{torch.cuda.is_available()}`", "", "This Space is configured for Hugging Face ZeroGPU.", "A shared GPU is requested on demand when you click `Run Full Pipeline`.", "Model files are prefetched during Space startup, before the ZeroGPU function is called.", "After the first successful request, model objects are reused while the Python process stays alive.", ] ) else: cuda_available = torch.cuda.is_available() lines.extend( [ "## CUDA", f"- `CUDA available`: `{cuda_available}`", f"- `GPU device`: `{get_gpu_name()}`", "", ] ) if accelerator == "none" or not cuda_available: lines.extend( [ "This Space is not currently running with a usable CUDA GPU.", "The GPU type must be chosen by the Space owner in Hugging Face `Settings -> Hardware`.", "Visitors cannot switch GPUs from inside the Gradio app.", ] ) else: lines.append("The CUDA runtime is available and the full SynLayers pipeline can run here.") return "\n".join(lines) @spaces.GPU(duration=ZERO_GPU_DURATION, size=ZERO_GPU_SIZE) def run_demo_inference( image_path: str, sample_name: str, max_new_tokens: int, seed_value: float, ) -> dict: seed = int(seed_value) if seed_value >= 0 else None return run_real_world_pipeline( image_path=image_path, sample_name=sample_name or None, work_dir=DEFAULT_WORK_DIR, bbox_model=DEFAULT_BBOX_MODEL, config_path=DEFAULT_REAL_CONFIG_PATH, max_new_tokens=int(max_new_tokens), seed=seed, run_name=DEFAULT_RUN_NAME, ) def run_demo( image_path: str, sample_name: str, max_new_tokens: int, seed_value: float, ): if not image_path: raise gr.Error("Please upload an input image first.") try: result = run_demo_inference( image_path=image_path, sample_name=sample_name, max_new_tokens=max_new_tokens, seed_value=seed_value, ) except Exception as exc: raise gr.Error(str(exc)) from exc return ( result["bbox_visualization"], result["merged_image"], result["bbox_record"].get("whole_caption", ""), result["bbox_record"], result["metadata"], build_gallery(result), result["archive_path"], result["case_dir"], ) with gr.Blocks(title="SynLayers Real-World Demo") as demo: gr.Markdown( """ # SynLayers Real-World Decomposition Upload a single image and run the full pipeline in one step: 1. VLM for whole-caption + bounding-box detection 2. SynLayers real-image layer decomposition This Space can run either on a dedicated GPU Space or on Hugging Face ZeroGPU. The first request may still take time while Python modules and model objects are initialized. Model files are prefetched during Space startup, and initialized model objects are reused while the process stays alive. """ ) runtime_status = gr.Markdown(get_runtime_status_markdown()) refresh_status_button = gr.Button("Refresh Runtime Status") with gr.Row(): with gr.Column(scale=1): image_input = gr.Image(type="filepath", label="Input Image") sample_name_input = gr.Textbox( label="Optional Sample Name", placeholder="Leave empty to use the uploaded filename", ) max_new_tokens_input = gr.Slider( minimum=128, maximum=2048, value=1024, step=64, label="VLM Max New Tokens", ) seed_input = gr.Number( value=42, precision=0, label="Seed (-1 keeps config default)", ) run_button = gr.Button("Run Full Pipeline", variant="primary") with gr.Column(scale=1): bbox_vis_output = gr.Image(type="filepath", label="Detected Bounding Boxes") merged_output = gr.Image(type="filepath", label="Merged Decomposition") caption_output = gr.Textbox(label="Whole Caption", lines=6) with gr.Row(): bbox_json_output = gr.JSON(label="BBox JSON") meta_json_output = gr.JSON(label="Inference Metadata") layer_gallery = gr.Gallery(label="Predicted Layers", columns=4, height="auto") with gr.Row(): archive_output = gr.File(label="Download Result Bundle") case_dir_output = gr.Textbox(label="Saved Case Directory") examples = list_example_images() if examples: gr.Examples(examples=examples, inputs=[image_input], label="Example Images") refresh_status_button.click( fn=get_runtime_status_markdown, outputs=runtime_status, ) run_button.click( fn=run_demo, inputs=[ image_input, sample_name_input, max_new_tokens_input, seed_input, ], outputs=[ bbox_vis_output, merged_output, caption_output, bbox_json_output, meta_json_output, layer_gallery, archive_output, case_dir_output, ], ) if __name__ == "__main__": demo.queue().launch( server_name="0.0.0.0", server_port=int(os.environ.get("PORT", "7860")), )