File size: 3,154 Bytes
5d3abae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b047e34
5d3abae
 
b047e34
5d3abae
 
 
 
b047e34
 
 
 
5d3abae
b047e34
5d3abae
b047e34
 
 
 
 
 
5d3abae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b047e34
5d3abae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b047e34
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
from __future__ import annotations

import os
import shutil
import subprocess
from dataclasses import dataclass
from datetime import datetime


@dataclass
class PreprocessResult:
    input_path: str
    output_path: str
    used_ffmpeg: bool
    message: str
    mode: str
    ffmpeg_cmd: list[str]


def check_ffmpeg_available() -> bool:
    return shutil.which("ffmpeg") is not None


def ensure_dir(path: str) -> None:
    os.makedirs(path, exist_ok=True)


def build_preprocessed_video_path(output_dir: str, input_path: str, mode: str) -> str:
    ensure_dir(output_dir)
    stem = os.path.splitext(os.path.basename(input_path))[0]
    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    return os.path.join(output_dir, f"{stem}_{mode}_preprocessed_{ts}.mp4")


def _build_ffmpeg_cmd(
    input_path: str,
    output_path: str,
    *,
    mode: str = "preview",
    remove_audio: bool = False,
) -> list[str]:
    mode = (mode or "preview").lower().strip()
    if mode not in {"preview", "analysis", "archival"}:
        raise ValueError(f"不支持的预处理模式: {mode}")

    if mode == "preview":
        # 更适合 Hugging Face Space
        width = 480
        fps = 8
        crf = 30
        preset = "veryfast"
        audio_bitrate = "64k"
    elif mode == "analysis":
        width = 960
        fps = 12
        crf = 24
        preset = "fast"
        audio_bitrate = "96k"
    else:
        width = 1280
        fps = 15
        crf = 18
        preset = "slow"
        audio_bitrate = "128k"

    vf = f"scale='min({width},iw)':-2,fps={fps}"

    cmd = [
        "ffmpeg",
        "-y",
        "-i", input_path,
        "-vf", vf,
        "-c:v", "libx264",
        "-preset", preset,
        "-crf", str(crf),
        "-movflags", "+faststart",
        "-pix_fmt", "yuv420p",
    ]

    if remove_audio:
        cmd += ["-an"]
    else:
        cmd += [
            "-c:a", "aac",
            "-b:a", audio_bitrate,
            "-ac", "1",
            "-ar", "16000",
        ]

    cmd.append(output_path)
    return cmd


def preprocess_video(
    input_path: str,
    output_dir: str,
    mode: str = "preview",
    remove_audio: bool = False,
) -> PreprocessResult:
    if not os.path.exists(input_path):
        raise FileNotFoundError(f"视频文件不存在: {input_path}")

    if not check_ffmpeg_available():
        raise RuntimeError("未检测到 ffmpeg,请先安装 ffmpeg。")

    output_path = build_preprocessed_video_path(output_dir, input_path, mode)
    cmd = _build_ffmpeg_cmd(
        input_path=input_path,
        output_path=output_path,
        mode=mode,
        remove_audio=remove_audio,
    )

    proc = subprocess.run(
        cmd,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True,
    )

    if proc.returncode != 0:
        raise RuntimeError(f"ffmpeg 预处理失败:\n{proc.stderr}")

    return PreprocessResult(
        input_path=input_path,
        output_path=output_path,
        used_ffmpeg=True,
        message=f"视频预处理完成(mode={mode}, audio={'removed' if remove_audio else 'kept'})",
        mode=mode,
        ffmpeg_cmd=cmd,
    )