"""Public Python API for simple offline video-to-video inference.""" from __future__ import annotations from contextlib import ExitStack from importlib.resources import as_file, files import os from pathlib import Path import shutil import socket import subprocess import sys import tempfile from typing import Literal, Sequence import torch from streamv2v.inference_common import load_mp4_as_tensor, normalize_acceleration_flags InferenceMode = Literal["single", "single-wo", "pipe"] _SINGLE_MODE_TO_MODULE = { "single": "streamv2v.inference", "single-wo": "streamv2v.inference_wo_batch", } def _resolve_default_config_path(resource_stack: ExitStack) -> str: resource = files("streamv2v.configs").joinpath("wan_causal_dmd_v2v.yaml") return str(resource_stack.enter_context(as_file(resource))) def _normalize_gpu_ids(gpu_ids: int | Sequence[int] | None) -> list[int] | None: if gpu_ids is None: return None if isinstance(gpu_ids, int): return [gpu_ids] return [int(gpu_id) for gpu_id in gpu_ids] def _normalize_device_gpu_id(device: str | torch.device | None) -> list[int] | None: if device is None: return None device_str = str(device) if not device_str.startswith("cuda:"): return None return [int(device_str.split(":", 1)[1])] def _resolve_single_gpu_id(gpu_ids: list[int] | None) -> int | None: if gpu_ids is None: return None if len(gpu_ids) != 1: raise ValueError("single and single-wo modes accept exactly one GPU id") return int(gpu_ids[0]) def _pick_free_port() -> int: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.bind(("127.0.0.1", 0)) return int(sock.getsockname()[1]) def _build_common_args( *, config_path: str, checkpoint_folder: str, video_path: str, prompt_file_path: str, output_folder: str, noise_scale: float, height: int, width: int, fps: int, step: int, seed: int, model_type: str, profile: bool, use_taehv: bool, use_tensorrt: bool, fast: bool, ) -> list[str]: args = [ "--config_path", config_path, "--checkpoint_folder", checkpoint_folder, "--output_folder", output_folder, "--prompt_file_path", prompt_file_path, "--video_path", video_path, "--noise_scale", str(noise_scale), "--height", str(height), "--width", str(width), "--fps", str(fps), "--step", str(step), "--seed", str(seed), "--model_type", model_type, ] if profile: args.append("--profile") if use_taehv: args.append("--use_taehv") if use_tensorrt: args.append("--use_tensorrt") if fast: args.append("--fast") return args class StreamVideoToVideo: """Convenience wrapper around the offline Python entrypoints.""" def __init__( self, checkpoint_folder: str, mode: InferenceMode = "single", *, config_path: str | None = None, device: str | torch.device | None = None, gpu_ids: int | Sequence[int] | None = None, num_gpus: int | None = None, noise_scale: float = 0.8, height: int = 480, width: int = 832, fps: int = 16, step: int = 2, seed: int = 0, model_type: str = "T2V-1.3B", use_taehv: bool = False, use_tensorrt: bool = False, fast: bool = False, profile: bool = False, schedule_block: bool = False, ) -> None: self.checkpoint_folder = checkpoint_folder self.mode = mode self.config_path = config_path self.device = device self.gpu_ids = gpu_ids self.num_gpus = num_gpus self.noise_scale = noise_scale self.height = height self.width = width self.fps = fps self.step = step self.seed = seed self.model_type = model_type self.use_taehv = use_taehv self.use_tensorrt = use_tensorrt self.fast = fast self.profile = profile self.schedule_block = schedule_block def generate(self, video_path: str, prompt: str) -> torch.Tensor: with tempfile.TemporaryDirectory(prefix="streamv2v_generate_") as temp_dir: output_path = os.path.join(temp_dir, "output.mp4") self.run_video(video_path=video_path, prompt=prompt, output_path=output_path) return load_mp4_as_tensor(output_path, normalize=False) def run_video(self, video_path: str, prompt: str, output_path: str) -> str: return run_video_to_video( checkpoint_folder=self.checkpoint_folder, video_path=video_path, prompt=prompt, output_path=output_path, mode=self.mode, config_path=self.config_path, device=self.device, gpu_ids=self.gpu_ids, num_gpus=self.num_gpus, noise_scale=self.noise_scale, height=self.height, width=self.width, fps=self.fps, step=self.step, seed=self.seed, model_type=self.model_type, use_taehv=self.use_taehv, use_tensorrt=self.use_tensorrt, fast=self.fast, profile=self.profile, schedule_block=self.schedule_block, ) def run_video_to_video( *, checkpoint_folder: str, video_path: str, prompt: str, output_path: str, mode: InferenceMode = "single", config_path: str | None = None, device: str | torch.device | None = None, gpu_ids: int | Sequence[int] | None = None, num_gpus: int | None = None, noise_scale: float = 0.8, height: int = 480, width: int = 832, fps: int = 16, step: int = 2, seed: int = 0, model_type: str = "T2V-1.3B", use_taehv: bool = False, use_tensorrt: bool = False, fast: bool = False, profile: bool = False, schedule_block: bool = False, ) -> str: """Run offline video-to-video inference from Python.""" flags = normalize_acceleration_flags( { "use_taehv": use_taehv, "use_tensorrt": use_tensorrt, "fast": fast, } ) use_taehv = bool(flags["use_taehv"]) use_tensorrt = bool(flags["use_tensorrt"]) fast = bool(flags["fast"]) requested_gpu_ids = _normalize_gpu_ids(gpu_ids) device_gpu_ids = _normalize_device_gpu_id(device) if requested_gpu_ids is None and device_gpu_ids is not None: requested_gpu_ids = device_gpu_ids if mode == "pipe": if num_gpus is None: num_gpus = len(requested_gpu_ids) if requested_gpu_ids is not None else 2 if requested_gpu_ids is not None and len(requested_gpu_ids) != num_gpus: raise ValueError("num_gpus must match len(gpu_ids) for pipe mode") elif num_gpus is not None and num_gpus != 1: raise ValueError("num_gpus is only used for pipe mode") resource_stack = ExitStack() try: resolved_config_path = config_path or _resolve_default_config_path(resource_stack) output_file = Path(output_path) output_file.parent.mkdir(parents=True, exist_ok=True) with tempfile.TemporaryDirectory(prefix="streamv2v_api_") as temp_dir: temp_dir_path = Path(temp_dir) prompt_path = temp_dir_path / "prompt.txt" prompt_path.write_text(prompt + "\n", encoding="utf-8") temp_output_dir = temp_dir_path / "outputs" temp_output_dir.mkdir(parents=True, exist_ok=True) common_args = _build_common_args( config_path=resolved_config_path, checkpoint_folder=checkpoint_folder, video_path=video_path, prompt_file_path=str(prompt_path), output_folder=str(temp_output_dir), noise_scale=noise_scale, height=height, width=width, fps=fps, step=step, seed=seed, model_type=model_type, profile=profile, use_taehv=use_taehv, use_tensorrt=use_tensorrt, fast=fast, ) env = os.environ.copy() if requested_gpu_ids is not None and mode == "pipe": env["CUDA_VISIBLE_DEVICES"] = ",".join(str(gpu_id) for gpu_id in requested_gpu_ids) if mode == "pipe": cmd = [ sys.executable, "-m", "torch.distributed.run", f"--nproc_per_node={num_gpus}", f"--master_port={_pick_free_port()}", "-m", "streamv2v.inference_pipe", *common_args, ] if schedule_block: cmd.append("--schedule_block") else: module_name = _SINGLE_MODE_TO_MODULE.get(mode) if module_name is None: raise ValueError(f"Unsupported mode: {mode}") cmd = [sys.executable, "-m", module_name, *common_args] single_gpu_id = _resolve_single_gpu_id(requested_gpu_ids) if single_gpu_id is not None: cmd.extend(["--gpu_id", str(single_gpu_id)]) subprocess.run(cmd, env=env, check=True) generated_path = temp_output_dir / "output_000.mp4" shutil.copy2(generated_path, output_file) return str(output_file) finally: resource_stack.close()