File size: 3,282 Bytes
101858b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ad2ce18
 
101858b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ad2ce18
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
from __future__ import annotations

import shutil
import subprocess
from dataclasses import dataclass
from pathlib import Path

from PIL import Image


@dataclass
class EncodeResult:
    path: Path
    encoder: str
    mp4: bool
    gif_path: Path | None = None
    audio_path: Path | None = None
    muxed_audio: bool = False


def ffmpeg_available() -> bool:
    return shutil.which("ffmpeg") is not None


def encode_video(frames: list[Image.Image], output_path: str | Path, *, fps: int, export_gif: bool = False) -> EncodeResult:
    path = Path(output_path)
    path.parent.mkdir(parents=True, exist_ok=True)
    fps = max(1, int(fps))
    if path.suffix.lower() == ".gif":
        frames[0].save(path, save_all=True, append_images=frames[1:], duration=int(1000 / fps), loop=0)
        return EncodeResult(path, "pil_gif", False, path)
    try:
        import imageio.v3 as iio
        import numpy as np

        iio.imwrite(path, [np.asarray(frame.convert("RGB")) for frame in frames], fps=fps)
        gif_path = _write_gif(frames, path.with_suffix(".gif"), fps) if export_gif else None
        return EncodeResult(path, "imageio", path.suffix.lower() == ".mp4", gif_path)
    except Exception:
        pass
    ffmpeg = shutil.which("ffmpeg")
    if ffmpeg:
        tmp = path.parent / f"{path.stem}_frames"
        tmp.mkdir(parents=True, exist_ok=True)
        for idx, frame in enumerate(frames):
            frame.save(tmp / f"frame_{idx:05d}.png")
        subprocess.run(
            [
                ffmpeg,
                "-y",
                "-framerate",
                str(fps),
                "-i",
                str(tmp / "frame_%05d.png"),
                "-pix_fmt",
                "yuv420p",
                str(path),
            ],
            check=True,
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
        )
        gif_path = _write_gif(frames, path.with_suffix(".gif"), fps) if export_gif else None
        return EncodeResult(path, "ffmpeg", path.suffix.lower() == ".mp4", gif_path)
    fallback = path.with_suffix(".gif")
    _write_gif(frames, fallback, fps)
    return EncodeResult(fallback, "pil_gif_fallback", False, fallback)


def _write_gif(frames: list[Image.Image], path: Path, fps: int) -> Path:
    frames[0].save(path, save_all=True, append_images=frames[1:], duration=int(1000 / fps), loop=0)
    return path


def mux_audio(video_path: str | Path, audio_path: str | Path, output_path: str | Path | None = None) -> Path | None:
    ffmpeg = shutil.which("ffmpeg")
    if not ffmpeg:
        return None
    video = Path(video_path)
    audio = Path(audio_path)
    if not video.exists() or not audio.exists():
        return None
    out = Path(output_path) if output_path is not None else video.with_name(f"{video.stem}_with_audio{video.suffix}")
    out.parent.mkdir(parents=True, exist_ok=True)
    command = [
        ffmpeg,
        "-y",
        "-i",
        str(video),
        "-i",
        str(audio),
        "-map",
        "0:v:0",
        "-map",
        "1:a:0",
        "-c:v",
        "copy",
        "-c:a",
        "aac",
        "-shortest",
        str(out),
    ]
    subprocess.run(command, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
    return out