| import json |
| import math |
| import os |
| import shutil |
| import subprocess |
| import sys |
| import tempfile |
| import threading |
| import time |
| import urllib.request |
| import uuid |
| import zipfile |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional, Tuple |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| try: |
| import huggingface_hub as _hf_hub |
|
|
| if not hasattr(_hf_hub, "HfFolder"): |
| class _CompatHfFolder: |
| path_token = os.environ.get( |
| "HF_TOKEN_PATH", |
| str(Path.home() / ".cache" / "huggingface" / "token"), |
| ) |
|
|
| @classmethod |
| def get_token(cls): |
| getter = getattr(_hf_hub, "get_token", None) |
| if callable(getter): |
| try: |
| return getter() |
| except Exception: |
| pass |
| token_path = Path(cls.path_token) |
| try: |
| token = token_path.read_text(encoding="utf-8").strip() |
| return token or None |
| except Exception: |
| return None |
|
|
| @classmethod |
| def save_token(cls, token: str): |
| token_path = Path(cls.path_token) |
| token_path.parent.mkdir(parents=True, exist_ok=True) |
| token_path.write_text(token, encoding="utf-8") |
|
|
| @classmethod |
| def delete_token(cls): |
| try: |
| Path(cls.path_token).unlink() |
| except FileNotFoundError: |
| pass |
|
|
| _hf_hub.HfFolder = _CompatHfFolder |
| except Exception: |
| pass |
|
|
| import cv2 |
| import gradio as gr |
| from huggingface_hub import HfApi, hf_hub_download |
| from PIL import Image, ImageOps |
|
|
| try: |
| import spaces |
| except Exception: |
| class _DummySpaces: |
| @staticmethod |
| def GPU(*args, **kwargs): |
| if args and callable(args[0]) and len(args) == 1 and not kwargs: |
| return args[0] |
|
|
| def decorator(fn): |
| return fn |
|
|
| return decorator |
|
|
| spaces = _DummySpaces() |
|
|
|
|
| APP_ROOT = Path(__file__).resolve().parent |
| WORK_ROOT = APP_ROOT / "workspace" |
| BACKEND_DIR = WORK_ROOT / "ComfyUI-SeedVR2_VideoUpscaler" |
| MODEL_DIR = APP_ROOT / "models" / "SEEDVR2" |
| JOBS_DIR = APP_ROOT / "jobs" |
| OUTPUTS_DIR = APP_ROOT / "outputs" |
|
|
| SEEDVR_BACKEND_GIT = "https://github.com/numz/ComfyUI-SeedVR2_VideoUpscaler.git" |
| SEEDVR_BACKEND_ZIP = ( |
| "https://codeload.github.com/numz/ComfyUI-SeedVR2_VideoUpscaler/zip/refs/heads/main" |
| ) |
|
|
| MODEL_SOURCES = { |
| "numz/SeedVR2_comfyUI": ".safetensors", |
| "cmeka/SeedVR2-GGUF": ".gguf", |
| } |
| VAE_REPO = "numz/SeedVR2_comfyUI" |
| VAE_FILE = "ema_vae_fp16.safetensors" |
|
|
| FALLBACK_MODELS = { |
| "numz/SeedVR2_comfyUI": [ |
| "seedvr2_ema_3b_fp16.safetensors", |
| "seedvr2_ema_3b_fp8_e4m3fn.safetensors", |
| "seedvr2_ema_7b_fp16.safetensors", |
| "seedvr2_ema_7b_fp8_e4m3fn.safetensors", |
| "seedvr2_ema_7b_sharp_fp16.safetensors", |
| "seedvr2_ema_7b_sharp_fp8_e4m3fn.safetensors", |
| ], |
| "cmeka/SeedVR2-GGUF": [ |
| "seedvr2_ema_3b-Q3_K_M.gguf", |
| "seedvr2_ema_3b-Q4_K_M.gguf", |
| "seedvr2_ema_3b-Q5_K_M.gguf", |
| "seedvr2_ema_3b-Q6_K.gguf", |
| "seedvr2_ema_3b-Q8_0.gguf", |
| "seedvr2_ema_7b-Q3_K_M.gguf", |
| "seedvr2_ema_7b-Q4_K_M.gguf", |
| "seedvr2_ema_7b-Q5_K_M.gguf", |
| "seedvr2_ema_7b-Q6_K.gguf", |
| "seedvr2_ema_7b-Q8_0.gguf", |
| "seedvr2_ema_7b_sharp-Q3_K_M.gguf", |
| "seedvr2_ema_7b_sharp-Q4_K_M.gguf", |
| "seedvr2_ema_7b_sharp-Q5_K_M.gguf", |
| "seedvr2_ema_7b_sharp-Q6_K.gguf", |
| "seedvr2_ema_7b_sharp-Q8_0.gguf", |
| ], |
| } |
|
|
| DEFAULT_MODELS = { |
| "numz/SeedVR2_comfyUI": "seedvr2_ema_3b_fp8_e4m3fn.safetensors", |
| "cmeka/SeedVR2-GGUF": "seedvr2_ema_3b-Q4_K_M.gguf", |
| } |
|
|
| RESIZE_MODE_LABELS = { |
| "pad": "保持比例并补边", |
| "crop": "保持比例并裁切", |
| "stretch": "强制拉伸到目标尺寸", |
| } |
|
|
| IMAGE_EXTS = {".png", ".jpg", ".jpeg", ".webp", ".bmp", ".tif", ".tiff"} |
| VIDEO_EXTS = {".mp4", ".mov", ".avi", ".mkv", ".webm", ".m4v"} |
|
|
| SETUP_LOCK = threading.Lock() |
| DOWNLOAD_LOCK = threading.Lock() |
| MODEL_CACHE: Dict[str, List[str]] = {} |
| API = HfApi() |
|
|
| for folder in (WORK_ROOT, MODEL_DIR, JOBS_DIR, OUTPUTS_DIR): |
| folder.mkdir(parents=True, exist_ok=True) |
|
|
|
|
| def tail_text(text: str, limit: int = 6000) -> str: |
| text = (text or "").strip() |
| if len(text) <= limit: |
| return text |
| return "...\n" + text[-limit:] |
|
|
|
|
| def ensure_even(value: float) -> int: |
| value_int = max(2, int(round(float(value)))) |
| if value_int % 2 == 1: |
| value_int += 1 |
| return value_int |
|
|
|
|
| def optional_positive_int(value: Any) -> Optional[int]: |
| if value in (None, ""): |
| return None |
| value = int(float(value)) |
| if value <= 0: |
| return None |
| return value |
|
|
|
|
| def cleanup_old_jobs(max_age_hours: int = 12, keep_last: int = 30) -> None: |
| job_dirs = [p for p in JOBS_DIR.iterdir() if p.is_dir()] |
| job_dirs.sort(key=lambda p: p.stat().st_mtime, reverse=True) |
| cutoff = time.time() - max_age_hours * 3600 |
| for idx, job_dir in enumerate(job_dirs): |
| if idx < keep_last and job_dir.stat().st_mtime >= cutoff: |
| continue |
| try: |
| shutil.rmtree(job_dir, ignore_errors=True) |
| except Exception: |
| pass |
|
|
|
|
| def choose_default_model(repo_id: str, choices: List[str]) -> Optional[str]: |
| preferred = DEFAULT_MODELS.get(repo_id) |
| if preferred in choices: |
| return preferred |
| return choices[0] if choices else None |
|
|
|
|
| def is_image_file(path: str) -> bool: |
| return Path(path).suffix.lower() in IMAGE_EXTS |
|
|
|
|
| def is_video_file(path: str) -> bool: |
| return Path(path).suffix.lower() in VIDEO_EXTS |
|
|
|
|
| def probe_media(path: str) -> Dict[str, Any]: |
| path_obj = Path(path) |
| ext = path_obj.suffix.lower() |
| if ext in IMAGE_EXTS: |
| with Image.open(path) as img: |
| return { |
| "kind": "image", |
| "width": int(img.width), |
| "height": int(img.height), |
| "frames": 1, |
| "fps": 30.0, |
| "duration": 0.0, |
| } |
| if ext in VIDEO_EXTS: |
| cap = cv2.VideoCapture(path) |
| if not cap.isOpened(): |
| raise gr.Error(f"无法读取视频:{path_obj.name}") |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) or 0) |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) or 0) |
| fps = float(cap.get(cv2.CAP_PROP_FPS) or 30.0) |
| frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0) |
| cap.release() |
| duration = (frames / fps) if fps > 0 else 0.0 |
| return { |
| "kind": "video", |
| "width": width, |
| "height": height, |
| "frames": frames, |
| "fps": fps, |
| "duration": duration, |
| } |
| raise gr.Error("仅支持图片或视频文件。") |
|
|
|
|
| def compute_target_size( |
| src_w: int, |
| src_h: int, |
| scale_factor: Any, |
| out_w: Any, |
| out_h: Any, |
| ) -> Tuple[int, int, str]: |
| width = optional_positive_int(out_w) |
| height = optional_positive_int(out_h) |
| factor = 2.0 if scale_factor in (None, "") else float(scale_factor) |
| if factor <= 0: |
| raise gr.Error("超分倍率必须大于 0。") |
|
|
| if width and height: |
| target_w = ensure_even(width) |
| target_h = ensure_even(height) |
| reason = "使用自定义宽高" |
| elif width: |
| target_w = ensure_even(width) |
| target_h = ensure_even(width * src_h / src_w) |
| reason = "仅指定输出宽度,按原始比例推算高度" |
| elif height: |
| target_h = ensure_even(height) |
| target_w = ensure_even(height * src_w / src_h) |
| reason = "仅指定输出高度,按原始比例推算宽度" |
| else: |
| target_w = ensure_even(src_w * factor) |
| target_h = ensure_even(src_h * factor) |
| reason = f"按 {factor:.3f}x 倍率计算输出尺寸" |
|
|
| return target_w, target_h, reason |
|
|
|
|
| def ensure_seedvr_backend() -> Path: |
| with SETUP_LOCK: |
| cli_file = BACKEND_DIR / "inference_cli.py" |
| if cli_file.exists(): |
| return BACKEND_DIR |
|
|
| tmp_dir = BACKEND_DIR.with_name(BACKEND_DIR.name + "_tmp") |
| shutil.rmtree(tmp_dir, ignore_errors=True) |
|
|
| try: |
| subprocess.run( |
| ["git", "clone", "--depth", "1", SEEDVR_BACKEND_GIT, str(tmp_dir)], |
| check=True, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| text=True, |
| ) |
| except Exception: |
| zip_path = WORK_ROOT / "seedvr2_backend.zip" |
| extract_root = WORK_ROOT / ("extract_" + uuid.uuid4().hex[:8]) |
| extract_root.mkdir(parents=True, exist_ok=True) |
| try: |
| urllib.request.urlretrieve(SEEDVR_BACKEND_ZIP, zip_path) |
| with zipfile.ZipFile(zip_path, "r") as zf: |
| zf.extractall(extract_root) |
| extracted = None |
| for item in extract_root.iterdir(): |
| if item.is_dir() and item.name.startswith("ComfyUI-SeedVR2_VideoUpscaler"): |
| extracted = item |
| break |
| if extracted is None: |
| raise RuntimeError("下载后的 SeedVR2 后端目录结构不符合预期。") |
| shutil.move(str(extracted), str(tmp_dir)) |
| finally: |
| try: |
| if zip_path.exists(): |
| zip_path.unlink() |
| except Exception: |
| pass |
| shutil.rmtree(extract_root, ignore_errors=True) |
|
|
| if not (tmp_dir / "inference_cli.py").exists(): |
| shutil.rmtree(tmp_dir, ignore_errors=True) |
| raise gr.Error("SeedVR2 后端拉取失败,缺少 inference_cli.py。") |
|
|
| if BACKEND_DIR.exists(): |
| shutil.rmtree(BACKEND_DIR, ignore_errors=True) |
| tmp_dir.rename(BACKEND_DIR) |
| return BACKEND_DIR |
|
|
|
|
| def fetch_models_from_repo(repo_id: str, force: bool = False) -> List[str]: |
| if not force and repo_id in MODEL_CACHE: |
| return MODEL_CACHE[repo_id] |
|
|
| ext = MODEL_SOURCES[repo_id] |
| try: |
| files = API.list_repo_files(repo_id, repo_type="model") |
| models = sorted( |
| file_name |
| for file_name in files |
| if "/" not in file_name |
| and file_name.startswith("seedvr2_") |
| and file_name.endswith(ext) |
| ) |
| if models: |
| MODEL_CACHE[repo_id] = models |
| return models |
| except Exception: |
| pass |
|
|
| fallback = FALLBACK_MODELS[repo_id][:] |
| MODEL_CACHE[repo_id] = fallback |
| return fallback |
|
|
|
|
| def update_model_dropdown(repo_id: str, force: bool = False): |
| choices = fetch_models_from_repo(repo_id, force=force) |
| return gr.update(choices=choices, value=choose_default_model(repo_id, choices)) |
|
|
|
|
| def ensure_model_files(model_repo: str, model_file: str) -> Tuple[Path, Path]: |
| with DOWNLOAD_LOCK: |
| dit_path = Path( |
| hf_hub_download( |
| repo_id=model_repo, |
| filename=model_file, |
| repo_type="model", |
| local_dir=str(MODEL_DIR), |
| ) |
| ) |
| vae_path = Path( |
| hf_hub_download( |
| repo_id=VAE_REPO, |
| filename=VAE_FILE, |
| repo_type="model", |
| local_dir=str(MODEL_DIR), |
| ) |
| ) |
| return dit_path, vae_path |
|
|
|
|
| def build_job( |
| input_path: str, |
| model_repo: str, |
| model_file: str, |
| scale_factor: Any, |
| out_w: Any, |
| out_h: Any, |
| resize_mode: str, |
| color_correction: str, |
| expected_kind: str, |
| batch_size: Optional[int] = None, |
| temporal_overlap: Optional[int] = None, |
| chunk_size: Optional[int] = None, |
| ) -> Tuple[Dict[str, Any], str]: |
| cleanup_old_jobs() |
|
|
| if not input_path: |
| raise gr.Error("请先上传输入文件。") |
| if not model_repo or model_repo not in MODEL_SOURCES: |
| raise gr.Error("请选择模型仓库。") |
| if not model_file: |
| raise gr.Error("请选择模型文件。") |
| if resize_mode not in RESIZE_MODE_LABELS: |
| raise gr.Error("输出尺寸策略不合法。") |
|
|
| ensure_seedvr_backend() |
| dit_path, vae_path = ensure_model_files(model_repo, model_file) |
|
|
| source_meta = probe_media(input_path) |
| if source_meta["kind"] != expected_kind: |
| raise gr.Error(f"当前标签页只接受{ '图片' if expected_kind == 'image' else '视频' }文件。") |
|
|
| if expected_kind == "video": |
| if batch_size is None: |
| batch_size = 5 |
| batch_size = int(batch_size) |
| if batch_size != 1 and (batch_size - 1) % 4 != 0: |
| raise gr.Error("视频 batch_size 必须满足 4n+1,例如 1/5/9/13/17/21。") |
| temporal_overlap = int(temporal_overlap or 0) |
| chunk_size = int(chunk_size or 0) |
| else: |
| batch_size = 1 |
| temporal_overlap = 0 |
| chunk_size = 0 |
|
|
| target_w, target_h, size_reason = compute_target_size( |
| source_meta["width"], |
| source_meta["height"], |
| scale_factor, |
| out_w, |
| out_h, |
| ) |
|
|
| job_id = f"{time.strftime('%Y%m%d-%H%M%S')}-{uuid.uuid4().hex[:8]}" |
| job_dir = JOBS_DIR / job_id |
| job_dir.mkdir(parents=True, exist_ok=True) |
|
|
| staged_input = job_dir / f"input{Path(input_path).suffix.lower()}" |
| shutil.copy2(input_path, staged_input) |
|
|
| raw_output = job_dir / ("seedvr2_raw.png" if expected_kind == "image" else "seedvr2_raw.mp4") |
| final_output = job_dir / ("seedvr2_out.png" if expected_kind == "image" else "seedvr2_out.mp4") |
|
|
| job = { |
| "job_id": job_id, |
| "kind": expected_kind, |
| "input_path": str(staged_input), |
| "raw_output": str(raw_output), |
| "final_output": str(final_output), |
| "source_width": source_meta["width"], |
| "source_height": source_meta["height"], |
| "frames": source_meta["frames"], |
| "fps": source_meta["fps"], |
| "duration": source_meta["duration"], |
| "target_width": target_w, |
| "target_height": target_h, |
| "cli_resolution": min(target_w, target_h), |
| "cli_max_resolution": max(target_w, target_h), |
| "model_repo": model_repo, |
| "model_file": model_file, |
| "dit_path": str(dit_path), |
| "vae_path": str(vae_path), |
| "batch_size": batch_size, |
| "temporal_overlap": temporal_overlap, |
| "chunk_size": chunk_size, |
| "resize_mode": resize_mode, |
| "color_correction": color_correction, |
| "size_reason": size_reason, |
| } |
|
|
| summary_lines = [ |
| f"任务已准备:{job_id}", |
| f"输入类型:{'图片' if expected_kind == 'image' else '视频'}", |
| f"输入尺寸:{source_meta['width']}x{source_meta['height']}", |
| f"目标尺寸:{target_w}x{target_h}", |
| f"尺寸来源:{size_reason}", |
| f"尺寸策略:{RESIZE_MODE_LABELS[resize_mode]}", |
| f"模型仓库:{model_repo}", |
| f"模型文件:{model_file}", |
| f"本地模型:{dit_path.name} / {vae_path.name}", |
| ] |
| if expected_kind == "video": |
| summary_lines.extend( |
| [ |
| f"视频信息:{source_meta['frames']} 帧,{source_meta['fps']:.2f} FPS,{source_meta['duration']:.2f} 秒", |
| f"batch_size={batch_size},temporal_overlap={temporal_overlap},chunk_size={chunk_size}", |
| ] |
| ) |
| return job, "\n".join(summary_lines) |
|
|
|
|
| def estimate_job_duration(job: Optional[Dict[str, Any]]) -> int: |
| if not job: |
| return 180 |
|
|
| megapixels = (job.get("target_width", 1280) * job.get("target_height", 720)) / 1_000_000 |
| model_name = str(job.get("model_file", "")).lower() |
| is_7b = "7b" in model_name |
| is_gguf = model_name.endswith(".gguf") |
|
|
| if job.get("kind") == "image": |
| estimate = 120 + megapixels * 35 |
| if is_7b: |
| estimate += 80 |
| if is_gguf: |
| estimate += 20 |
| return int(max(120, min(600, estimate))) |
|
|
| frames = max(1, int(job.get("frames", 1))) |
| per_frame = 0.25 if not is_7b else 0.40 |
| if is_gguf: |
| per_frame *= 1.15 |
| estimate = 120 + frames * per_frame * max(1.0, megapixels / 0.9) |
| if int(job.get("chunk_size", 0)) > 0: |
| estimate += 45 |
| return int(max(180, min(120, estimate))) |
|
|
|
|
| def build_cli_command(job: Dict[str, Any]) -> List[str]: |
| backend_dir = ensure_seedvr_backend() |
| cmd = [ |
| sys.executable, |
| str(backend_dir / "inference_cli.py"), |
| job["input_path"], |
| "--output", |
| job["raw_output"], |
| "--output_format", |
| "png" if job["kind"] == "image" else "mp4", |
| "--model_dir", |
| str(MODEL_DIR), |
| "--dit_model", |
| job["model_file"], |
| "--resolution", |
| str(job["cli_resolution"]), |
| "--max_resolution", |
| str(job["cli_max_resolution"]), |
| "--batch_size", |
| str(job["batch_size"]), |
| "--color_correction", |
| str(job["color_correction"]), |
| ] |
|
|
| if job["kind"] == "video": |
| cmd.extend(["--video_backend", "opencv"]) |
| if int(job.get("temporal_overlap", 0)) > 0: |
| cmd.extend(["--temporal_overlap", str(job["temporal_overlap"])]) |
| if int(job.get("chunk_size", 0)) > 0: |
| cmd.extend(["--chunk_size", str(job["chunk_size"])]) |
| if int(job.get("batch_size", 1)) > 1: |
| cmd.append("--uniform_batch_size") |
| return cmd |
|
|
|
|
| def resize_image( |
| input_path: str, |
| output_path: str, |
| width: int, |
| height: int, |
| mode: str, |
| ) -> None: |
| with Image.open(input_path) as img: |
| has_alpha = img.mode in ("RGBA", "LA") or "transparency" in img.info |
| if has_alpha: |
| img = img.convert("RGBA") |
| else: |
| img = img.convert("RGB") |
|
|
| if mode == "stretch": |
| out = img.resize((width, height), resample=Image.LANCZOS) |
| elif mode == "crop": |
| out = ImageOps.fit(img, (width, height), method=Image.LANCZOS, centering=(0.5, 0.5)) |
| else: |
| contained = ImageOps.contain(img, (width, height), method=Image.LANCZOS) |
| if has_alpha: |
| bg_color = (0, 0, 0, 0) |
| out = Image.new("RGBA", (width, height), bg_color) |
| out.alpha_composite(contained, ((width - contained.width) // 2, (height - contained.height) // 2)) |
| else: |
| out = Image.new("RGB", (width, height), (0, 0, 0)) |
| out.paste(contained, ((width - contained.width) // 2, (height - contained.height) // 2)) |
|
|
| Path(output_path).parent.mkdir(parents=True, exist_ok=True) |
| out.save(output_path) |
|
|
|
|
| def resize_frame(frame, width: int, height: int, mode: str): |
| src_h, src_w = frame.shape[:2] |
| if mode == "stretch": |
| return cv2.resize(frame, (width, height), interpolation=cv2.INTER_LANCZOS4) |
|
|
| scale = max(width / src_w, height / src_h) if mode == "crop" else min(width / src_w, height / src_h) |
| scaled_w = max(1, int(round(src_w * scale))) |
| scaled_h = max(1, int(round(src_h * scale))) |
| resized = cv2.resize(frame, (scaled_w, scaled_h), interpolation=cv2.INTER_LANCZOS4) |
|
|
| if mode == "crop": |
| x0 = max(0, (scaled_w - width) // 2) |
| y0 = max(0, (scaled_h - height) // 2) |
| return resized[y0:y0 + height, x0:x0 + width] |
|
|
| channels = resized.shape[2] if len(resized.shape) == 3 else 1 |
| border_value = (0, 0, 0, 0) if channels == 4 else (0, 0, 0) |
| return cv2.copyMakeBorder( |
| resized, |
| (height - scaled_h) // 2, |
| height - scaled_h - (height - scaled_h) // 2, |
| (width - scaled_w) // 2, |
| width - scaled_w - (width - scaled_w) // 2, |
| cv2.BORDER_CONSTANT, |
| value=border_value, |
| ) |
|
|
|
|
| def resize_video_cv2( |
| input_path: str, |
| output_path: str, |
| width: int, |
| height: int, |
| mode: str, |
| fallback_fps: float = 30.0, |
| ) -> None: |
| cap = cv2.VideoCapture(input_path) |
| if not cap.isOpened(): |
| raise RuntimeError(f"无法读取中间视频:{input_path}") |
| fps = float(cap.get(cv2.CAP_PROP_FPS) or fallback_fps or 30.0) |
| fourcc = cv2.VideoWriter_fourcc(*"mp4v") |
| Path(output_path).parent.mkdir(parents=True, exist_ok=True) |
| writer = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) |
| if not writer.isOpened(): |
| cap.release() |
| raise RuntimeError("无法创建输出视频,请检查编码器。") |
|
|
| try: |
| while True: |
| ok, frame = cap.read() |
| if not ok: |
| break |
| frame = resize_frame(frame, width, height, mode) |
| if frame.shape[1] != width or frame.shape[0] != height: |
| frame = cv2.resize(frame, (width, height), interpolation=cv2.INTER_LANCZOS4) |
| if frame.shape[2] == 4: |
| frame = cv2.cvtColor(frame, cv2.COLOR_BGRA2BGR) |
| writer.write(frame) |
| finally: |
| cap.release() |
| writer.release() |
|
|
|
|
| def resize_video_ffmpeg( |
| input_path: str, |
| output_path: str, |
| width: int, |
| height: int, |
| mode: str, |
| ) -> None: |
| if mode == "stretch": |
| vf = f"scale={width}:{height}:flags=lanczos" |
| elif mode == "crop": |
| vf = ( |
| f"scale={width}:{height}:flags=lanczos:force_original_aspect_ratio=increase," |
| f"crop={width}:{height}" |
| ) |
| else: |
| vf = ( |
| f"scale={width}:{height}:flags=lanczos:force_original_aspect_ratio=decrease," |
| f"pad={width}:{height}:(ow-iw)/2:(oh-ih)/2:color=black" |
| ) |
|
|
| cmd = [ |
| "ffmpeg", |
| "-y", |
| "-i", |
| input_path, |
| "-vf", |
| vf, |
| "-an", |
| "-c:v", |
| "libx264", |
| "-pix_fmt", |
| "yuv420p", |
| "-movflags", |
| "+faststart", |
| output_path, |
| ] |
| subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) |
|
|
|
|
| def finalize_output(job: Dict[str, Any]) -> Dict[str, Any]: |
| raw_output = Path(job["raw_output"]) |
| final_output = Path(job["final_output"]) |
| target_w = int(job["target_width"]) |
| target_h = int(job["target_height"]) |
| resize_mode = str(job["resize_mode"]) |
|
|
| if not raw_output.exists(): |
| raise RuntimeError("SeedVR2 已运行,但没有找到输出文件。") |
|
|
| raw_meta = probe_media(str(raw_output)) |
| already_exact = ( |
| raw_meta["width"] == target_w |
| and raw_meta["height"] == target_h |
| ) |
|
|
| if already_exact: |
| shutil.move(str(raw_output), str(final_output)) |
| elif job["kind"] == "image": |
| resize_image(str(raw_output), str(final_output), target_w, target_h, resize_mode) |
| else: |
| try: |
| if shutil.which("ffmpeg"): |
| resize_video_ffmpeg(str(raw_output), str(final_output), target_w, target_h, resize_mode) |
| else: |
| resize_video_cv2( |
| str(raw_output), |
| str(final_output), |
| target_w, |
| target_h, |
| resize_mode, |
| fallback_fps=float(job.get("fps", 30.0) or 30.0), |
| ) |
| except subprocess.CalledProcessError as exc: |
| raise RuntimeError(tail_text(exc.stderr or str(exc), 2500)) from exc |
|
|
| final_meta = probe_media(str(final_output)) |
| return { |
| "raw_width": raw_meta["width"], |
| "raw_height": raw_meta["height"], |
| "final_width": final_meta["width"], |
| "final_height": final_meta["height"], |
| "path": str(final_output), |
| } |
|
|
|
|
| def run_seedvr_job_core(job: Dict[str, Any]) -> Tuple[str, str]: |
| if not job: |
| raise gr.Error("任务状态为空,请重新点击运行。") |
|
|
| ensure_seedvr_backend() |
| cmd = build_cli_command(job) |
| env = os.environ.copy() |
| env.setdefault("PYTHONUNBUFFERED", "1") |
|
|
| proc = subprocess.run( |
| cmd, |
| cwd=str(BACKEND_DIR), |
| env=env, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| text=True, |
| ) |
| logs = ((proc.stdout or "") + "\n" + (proc.stderr or "")).strip() |
| if proc.returncode != 0: |
| raise gr.Error("SeedVR2 运行失败:\n\n" + tail_text(logs, 5000)) |
|
|
| result_meta = finalize_output(job) |
| summary_lines = [ |
| f"任务完成:{job['job_id']}", |
| f"模型:{job['model_repo']} / {job['model_file']}", |
| f"原始输入:{job['source_width']}x{job['source_height']}", |
| f"SeedVR2 直接输出:{result_meta['raw_width']}x{result_meta['raw_height']}", |
| f"最终输出:{result_meta['final_width']}x{result_meta['final_height']}", |
| f"尺寸策略:{RESIZE_MODE_LABELS[job['resize_mode']]}", |
| f"输出文件:{result_meta['path']}", |
| ] |
| if job["kind"] == "video": |
| summary_lines.append( |
| f"视频参数:batch_size={job['batch_size']} / temporal_overlap={job['temporal_overlap']} / chunk_size={job['chunk_size']}" |
| ) |
| summary_lines.append("") |
| summary_lines.append("执行日志(末尾截断):") |
| summary_lines.append(tail_text(logs, 5000) or "<无日志>") |
| return result_meta["path"], "\n".join(summary_lines) |
|
|
|
|
| @spaces.GPU(duration=estimate_job_duration) |
| def run_image_job(job: Dict[str, Any]): |
| output_path, summary = run_seedvr_job_core(job) |
| return output_path, output_path, summary |
|
|
|
|
| @spaces.GPU(duration=estimate_job_duration) |
| def run_video_job(job: Dict[str, Any]): |
| output_path, summary = run_seedvr_job_core(job) |
| return output_path, output_path, summary |
|
|
|
|
| def prepare_image_job( |
| image_path: str, |
| model_repo: str, |
| model_file: str, |
| scale_factor: Any, |
| out_w: Any, |
| out_h: Any, |
| resize_mode: str, |
| color_correction: str, |
| ): |
| return build_job( |
| input_path=image_path, |
| model_repo=model_repo, |
| model_file=model_file, |
| scale_factor=scale_factor, |
| out_w=out_w, |
| out_h=out_h, |
| resize_mode=resize_mode, |
| color_correction=color_correction, |
| expected_kind="image", |
| ) |
|
|
|
|
| def prepare_video_job( |
| video_path: str, |
| model_repo: str, |
| model_file: str, |
| scale_factor: Any, |
| out_w: Any, |
| out_h: Any, |
| resize_mode: str, |
| color_correction: str, |
| batch_size: int, |
| temporal_overlap: int, |
| chunk_size: Any, |
| ): |
| return build_job( |
| input_path=video_path, |
| model_repo=model_repo, |
| model_file=model_file, |
| scale_factor=scale_factor, |
| out_w=out_w, |
| out_h=out_h, |
| resize_mode=resize_mode, |
| color_correction=color_correction, |
| expected_kind="video", |
| batch_size=batch_size, |
| temporal_overlap=temporal_overlap, |
| chunk_size=chunk_size, |
| ) |
|
|
|
|
| def on_repo_change(repo_id: str): |
| return update_model_dropdown(repo_id, force=False) |
|
|
|
|
| def on_repo_refresh(repo_id: str): |
| return update_model_dropdown(repo_id, force=True) |
|
|
|
|
| INITIAL_REPO = "numz/SeedVR2_comfyUI" |
| INITIAL_MODELS = fetch_models_from_repo(INITIAL_REPO) |
| INITIAL_MODEL = choose_default_model(INITIAL_REPO, INITIAL_MODELS) |
|
|
|
|
| with gr.Blocks(title="SeedVR2 ZeroGPU Space", fill_width=True) as demo: |
| gr.Markdown( |
| "# SeedVR2 ZeroGPU 超分\n" |
| "使用官方 `ComfyUI-SeedVR2_VideoUpscaler` CLI 作为后端,支持图片/视频、" |
| "`numz/SeedVR2_comfyUI` 的 `.safetensors` 与 `cmeka/SeedVR2-GGUF` 的 `.gguf`。\n\n" |
| "- **超分倍率**:当输出宽高为空时生效\n" |
| "- **自定义输出分辨率**:宽高可都填,也可只填一个\n" |
| "- **输出尺寸策略**:支持补边 / 裁切 / 拉伸\n" |
| "- **ZeroGPU**:模型下载和文件准备走 CPU,真正推理阶段才申请 GPU" |
| ) |
|
|
| with gr.Tab("图片"): |
| image_job_state = gr.State() |
| with gr.Row(): |
| image_input = gr.File( |
| label="上传图片", |
| file_count="single", |
| type="filepath", |
| file_types=sorted(IMAGE_EXTS), |
| ) |
| image_preview = gr.Image(label="输出预览", type="filepath") |
|
|
| with gr.Row(): |
| image_repo = gr.Dropdown( |
| label="模型仓库", |
| choices=list(MODEL_SOURCES.keys()), |
| value=INITIAL_REPO, |
| ) |
| image_model = gr.Dropdown( |
| label="模型文件", |
| choices=INITIAL_MODELS, |
| value=INITIAL_MODEL, |
| allow_custom_value=False, |
| ) |
| image_refresh = gr.Button("刷新模型列表") |
|
|
| with gr.Row(): |
| image_scale = gr.Number(label="超分倍率", value=2.0, precision=3) |
| image_out_w = gr.Number(label="输出宽度(可选)", value=None, precision=0) |
| image_out_h = gr.Number(label="输出高度(可选)", value=None, precision=0) |
|
|
| with gr.Row(): |
| image_resize_mode = gr.Dropdown( |
| label="输出尺寸策略", |
| choices=[ |
| ("保持比例并补边", "pad"), |
| ("保持比例并裁切", "crop"), |
| ("强制拉伸到目标尺寸", "stretch"), |
| ], |
| value="pad", |
| ) |
| image_color = gr.Dropdown( |
| label="颜色校正", |
| choices=["lab", "wavelet", "wavelet_adaptive", "hsv", "adain", "none"], |
| value="lab", |
| ) |
|
|
| image_run = gr.Button("开始图片超分", variant="primary") |
| image_file_out = gr.File(label="下载结果") |
| image_status = gr.Textbox(label="运行日志", lines=18) |
|
|
| image_repo.change(on_repo_change, inputs=image_repo, outputs=image_model) |
| image_refresh.click(on_repo_refresh, inputs=image_repo, outputs=image_model) |
| image_run.click( |
| prepare_image_job, |
| inputs=[ |
| image_input, |
| image_repo, |
| image_model, |
| image_scale, |
| image_out_w, |
| image_out_h, |
| image_resize_mode, |
| image_color, |
| ], |
| outputs=[image_job_state, image_status], |
| ).then( |
| run_image_job, |
| inputs=image_job_state, |
| outputs=[image_preview, image_file_out, image_status], |
| ) |
|
|
| with gr.Tab("视频"): |
| video_job_state = gr.State() |
| with gr.Row(): |
| video_input = gr.File( |
| label="上传视频", |
| file_count="single", |
| type="filepath", |
| file_types=sorted(VIDEO_EXTS), |
| ) |
| video_preview = gr.Video(label="输出预览") |
|
|
| with gr.Row(): |
| video_repo = gr.Dropdown( |
| label="模型仓库", |
| choices=list(MODEL_SOURCES.keys()), |
| value=INITIAL_REPO, |
| ) |
| video_model = gr.Dropdown( |
| label="模型文件", |
| choices=INITIAL_MODELS, |
| value=INITIAL_MODEL, |
| allow_custom_value=False, |
| ) |
| video_refresh = gr.Button("刷新模型列表") |
|
|
| with gr.Row(): |
| video_scale = gr.Number(label="超分倍率", value=2.0, precision=3) |
| video_out_w = gr.Number(label="输出宽度(可选)", value=None, precision=0) |
| video_out_h = gr.Number(label="输出高度(可选)", value=None, precision=0) |
|
|
| with gr.Row(): |
| video_resize_mode = gr.Dropdown( |
| label="输出尺寸策略", |
| choices=[ |
| ("保持比例并补边", "pad"), |
| ("保持比例并裁切", "crop"), |
| ("强制拉伸到目标尺寸", "stretch"), |
| ], |
| value="pad", |
| ) |
| video_color = gr.Dropdown( |
| label="颜色校正", |
| choices=["lab", "wavelet", "wavelet_adaptive", "hsv", "adain", "none"], |
| value="lab", |
| ) |
|
|
| with gr.Row(): |
| video_batch = gr.Dropdown( |
| label="batch_size(必须是 4n+1)", |
| choices=[1, 5, 9, 13, 17, 21, 25, 33], |
| value=5, |
| ) |
| video_overlap = gr.Slider( |
| label="temporal_overlap", |
| minimum=0, |
| maximum=16, |
| step=1, |
| value=3, |
| ) |
| video_chunk = gr.Number( |
| label="chunk_size(0=整段加载)", |
| value=0, |
| precision=0, |
| ) |
|
|
| video_run = gr.Button("开始视频超分", variant="primary") |
| video_file_out = gr.File(label="下载结果") |
| video_status = gr.Textbox(label="运行日志", lines=20) |
|
|
| video_repo.change(on_repo_change, inputs=video_repo, outputs=video_model) |
| video_refresh.click(on_repo_refresh, inputs=video_repo, outputs=video_model) |
| video_run.click( |
| prepare_video_job, |
| inputs=[ |
| video_input, |
| video_repo, |
| video_model, |
| video_scale, |
| video_out_w, |
| video_out_h, |
| video_resize_mode, |
| video_color, |
| video_batch, |
| video_overlap, |
| video_chunk, |
| ], |
| outputs=[video_job_state, video_status], |
| ).then( |
| run_video_job, |
| inputs=video_job_state, |
| outputs=[video_preview, video_file_out, video_status], |
| ) |
|
|
| gr.Markdown( |
| "### 说明\n" |
| "1. `numz/SeedVR2_comfyUI` 的 VAE 会自动下载到 `models/SEEDVR2`。\n" |
| "2. `cmeka/SeedVR2-GGUF` 只提供 GGUF DiT,因此仍会同时下载官方 VAE。\n" |
| "3. 若仓库后续新增模型,点 **刷新模型列表** 就能拉到最新文件名。\n" |
| "4. 没有填写输出宽高时,使用倍率计算目标分辨率;填写宽/高后会优先按宽高输出。" |
| ) |
|
|
| demo.queue(default_concurrency_limit=1, max_size=16) |
|
|
| if __name__ == "__main__": |
| demo.launch() |
|
|