Spaces:
Sleeping
Sleeping
File size: 3,154 Bytes
5d3abae b047e34 5d3abae b047e34 5d3abae b047e34 5d3abae b047e34 5d3abae b047e34 5d3abae b047e34 5d3abae b047e34 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 | from __future__ import annotations
import os
import shutil
import subprocess
from dataclasses import dataclass
from datetime import datetime
@dataclass
class PreprocessResult:
input_path: str
output_path: str
used_ffmpeg: bool
message: str
mode: str
ffmpeg_cmd: list[str]
def check_ffmpeg_available() -> bool:
return shutil.which("ffmpeg") is not None
def ensure_dir(path: str) -> None:
os.makedirs(path, exist_ok=True)
def build_preprocessed_video_path(output_dir: str, input_path: str, mode: str) -> str:
ensure_dir(output_dir)
stem = os.path.splitext(os.path.basename(input_path))[0]
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
return os.path.join(output_dir, f"{stem}_{mode}_preprocessed_{ts}.mp4")
def _build_ffmpeg_cmd(
input_path: str,
output_path: str,
*,
mode: str = "preview",
remove_audio: bool = False,
) -> list[str]:
mode = (mode or "preview").lower().strip()
if mode not in {"preview", "analysis", "archival"}:
raise ValueError(f"不支持的预处理模式: {mode}")
if mode == "preview":
# 更适合 Hugging Face Space
width = 480
fps = 8
crf = 30
preset = "veryfast"
audio_bitrate = "64k"
elif mode == "analysis":
width = 960
fps = 12
crf = 24
preset = "fast"
audio_bitrate = "96k"
else:
width = 1280
fps = 15
crf = 18
preset = "slow"
audio_bitrate = "128k"
vf = f"scale='min({width},iw)':-2,fps={fps}"
cmd = [
"ffmpeg",
"-y",
"-i", input_path,
"-vf", vf,
"-c:v", "libx264",
"-preset", preset,
"-crf", str(crf),
"-movflags", "+faststart",
"-pix_fmt", "yuv420p",
]
if remove_audio:
cmd += ["-an"]
else:
cmd += [
"-c:a", "aac",
"-b:a", audio_bitrate,
"-ac", "1",
"-ar", "16000",
]
cmd.append(output_path)
return cmd
def preprocess_video(
input_path: str,
output_dir: str,
mode: str = "preview",
remove_audio: bool = False,
) -> PreprocessResult:
if not os.path.exists(input_path):
raise FileNotFoundError(f"视频文件不存在: {input_path}")
if not check_ffmpeg_available():
raise RuntimeError("未检测到 ffmpeg,请先安装 ffmpeg。")
output_path = build_preprocessed_video_path(output_dir, input_path, mode)
cmd = _build_ffmpeg_cmd(
input_path=input_path,
output_path=output_path,
mode=mode,
remove_audio=remove_audio,
)
proc = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if proc.returncode != 0:
raise RuntimeError(f"ffmpeg 预处理失败:\n{proc.stderr}")
return PreprocessResult(
input_path=input_path,
output_path=output_path,
used_ffmpeg=True,
message=f"视频预处理完成(mode={mode}, audio={'removed' if remove_audio else 'kept'})",
mode=mode,
ffmpeg_cmd=cmd,
) |