from __future__ import annotations import os import shutil import subprocess from dataclasses import dataclass from datetime import datetime @dataclass class PreprocessResult: input_path: str output_path: str used_ffmpeg: bool message: str mode: str ffmpeg_cmd: list[str] def check_ffmpeg_available() -> bool: return shutil.which("ffmpeg") is not None def ensure_dir(path: str) -> None: os.makedirs(path, exist_ok=True) def build_preprocessed_video_path(output_dir: str, input_path: str, mode: str) -> str: ensure_dir(output_dir) stem = os.path.splitext(os.path.basename(input_path))[0] ts = datetime.now().strftime("%Y%m%d_%H%M%S") return os.path.join(output_dir, f"{stem}_{mode}_preprocessed_{ts}.mp4") def _build_ffmpeg_cmd( input_path: str, output_path: str, *, mode: str = "preview", remove_audio: bool = False, ) -> list[str]: mode = (mode or "preview").lower().strip() if mode not in {"preview", "analysis", "archival"}: raise ValueError(f"不支持的预处理模式: {mode}") if mode == "preview": # 更适合 Hugging Face Space width = 480 fps = 8 crf = 30 preset = "veryfast" audio_bitrate = "64k" elif mode == "analysis": width = 960 fps = 12 crf = 24 preset = "fast" audio_bitrate = "96k" else: width = 1280 fps = 15 crf = 18 preset = "slow" audio_bitrate = "128k" vf = f"scale='min({width},iw)':-2,fps={fps}" cmd = [ "ffmpeg", "-y", "-i", input_path, "-vf", vf, "-c:v", "libx264", "-preset", preset, "-crf", str(crf), "-movflags", "+faststart", "-pix_fmt", "yuv420p", ] if remove_audio: cmd += ["-an"] else: cmd += [ "-c:a", "aac", "-b:a", audio_bitrate, "-ac", "1", "-ar", "16000", ] cmd.append(output_path) return cmd def preprocess_video( input_path: str, output_dir: str, mode: str = "preview", remove_audio: bool = False, ) -> PreprocessResult: if not os.path.exists(input_path): raise FileNotFoundError(f"视频文件不存在: {input_path}") if not check_ffmpeg_available(): raise RuntimeError("未检测到 ffmpeg,请先安装 ffmpeg。") output_path = build_preprocessed_video_path(output_dir, input_path, mode) cmd = _build_ffmpeg_cmd( input_path=input_path, output_path=output_path, mode=mode, remove_audio=remove_audio, ) proc = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) if proc.returncode != 0: raise RuntimeError(f"ffmpeg 预处理失败:\n{proc.stderr}") return PreprocessResult( input_path=input_path, output_path=output_path, used_ffmpeg=True, message=f"视频预处理完成(mode={mode}, audio={'removed' if remove_audio else 'kept'})", mode=mode, ffmpeg_cmd=cmd, )