File size: 7,430 Bytes
a4a12a5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f7bb210
 
a4a12a5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9075525
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a4a12a5
 
9075525
a4a12a5
 
 
 
 
 
9075525
 
 
 
a4a12a5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9075525
 
 
 
 
a4a12a5
 
 
9075525
 
a4a12a5
 
 
 
9075525
a4a12a5
 
 
 
 
 
 
 
 
1f358f3
9075525
 
 
 
 
a4a12a5
 
9075525
a4a12a5
9075525
1f358f3
a4a12a5
 
9075525
a4a12a5
 
 
 
 
fb45b53
a4a12a5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# app.py — Re-encode Frames to Video (FFmpeg only)

import os, zipfile, tempfile, subprocess, base64
from pathlib import Path
from typing import List, Optional, Tuple
import gradio as gr

def try_load_logo_b64() -> str:
    try:
        with open("bifrost_logo.png", "rb") as f:
            import base64
            return base64.b64encode(f.read()).decode("utf-8")
    except Exception:
        return ""
LOGO_B64 = try_load_logo_b64()

def render_logo_html(px: int = 96) -> str:
    img = f'<img src="data:image/png;base64,{LOGO_B64}" style="height:{px}px;width:auto;" />' if LOGO_B64 else ""
    return f"""
    <div style="display:flex;align-items:center;gap:16px;">
      {img}
      <div>
        <div style="font-size:1.6rem;font-weight:800;">Valknut · Re-encode Images-to-Video</div>
        <div style="opacity:0.8;">Valknut binds frames and sound, encoding into one unified whole</div>
      </div>
    </div>
    <hr>
    """

def _which(name: str) -> Optional[str]:
    from shutil import which
    return which(name)

FFMPEG = _which("ffmpeg")
FFPROBE = _which("ffprobe")
MISSING_MSG = "" if (FFMPEG and FFPROBE) else (
    "⚠️ FFmpeg/FFprobe not found. Add a 'packages.txt' with:\n"
    "ffmpeg\nlibsm6\nlibxext6\nand restart the Space."
)

def render_progress(pct: float, label: str = "") -> str:
    pct = max(0.0, min(100.0, pct))
    return f'''<div style="width:100%;border:1px solid #ddd;border-radius:8px;overflow:hidden;height:18px;">
<div style="height:100%;width:{pct:.1f}%;background:#3b82f6;"></div></div>
<div style="font-size:12px;opacity:.8;margin-top:4px;">{label} {pct:.1f}%</div>'''

def prepare_frames_from_upload(files: List[gr.File] | None, prefix: str = "enc") -> Tuple[Optional[str], Optional[str]]:
    if not files:
        return None, None
    work = Path(tempfile.mkdtemp(prefix="enc_"))
    frames_dir = work / "frames"; frames_dir.mkdir(parents=True, exist_ok=True)
    detected_prefix = None

    if len(files) == 1 and Path(files[0].name).suffix.lower() == ".zip":
        with zipfile.ZipFile(files[0].name, "r") as zf:
            zf.extractall(frames_dir)
        imgs = sorted(frames_dir.glob("*.jpg")) + sorted(frames_dir.glob("*.png"))
        if imgs:
            detected_prefix = Path(imgs[0]).stem.split("_")[0]
        return str(frames_dir), detected_prefix or prefix

    counter = 1
    for f in files:
        src = Path(f.name)
        if src.suffix.lower() not in [".jpg", ".jpeg", ".png"]:
            continue
        dst = frames_dir / f"{prefix}_{counter:05d}{src.suffix.lower()}"
        src.replace(dst) if src.exists() else None
        counter += 1
    return str(frames_dir), prefix

def build_ffmpeg_encode(frames_dir: str, prefix: str, fps: float, fmt: str,
                        include_audio: bool, orig_video: str | None) -> list[str]:
    jpgs = sorted(Path(frames_dir).glob(f"{prefix}_*.jpg"))
    pngs = sorted(Path(frames_dir).glob(f"{prefix}_*.png"))
    imgs = jpgs if jpgs else pngs
    if not imgs:
        return []

    first_frame = imgs[0].name
    pattern = str(imgs[0].with_name(f"{prefix}_%05d{imgs[0].suffix}"))
    start_num = int(Path(first_frame).stem.split("_")[-1])

    args = [FFMPEG, "-y", "-start_number", str(start_num),
            "-framerate", f"{fps:.6f}", "-i", pattern]

    if include_audio and orig_video:
        args += ["-i", orig_video, "-map", "0:v:0", "-map", "1:a:0", "-shortest"]

    if fmt == "h265":
        vcodec = ["-c:v", "libx265"]
    elif fmt == "vp9":
        vcodec = ["-c:v", "libvpx-vp9"]
    else:
        vcodec = ["-c:v", "libx264"]

    out_name = "output.mp4" if fmt in ("h264", "h265") else "output.webm"
    return args + vcodec + ["-pix_fmt", "yuv420p", "-crf", "18", out_name]


def step3_encode(
    uploaded_frames: List[gr.File] | None,
    uploaded_audio_video: gr.File | None,
    fps: float,
    fmt: str,
    include_audio: bool,
    prog_html: str,
):
    if not (FFMPEG and FFPROBE):
        yield None, "FFmpeg/FFprobe missing. See note below.", prog_html
        return
    frames_dir, prefix = prepare_frames_from_upload(uploaded_frames, "enc")
    if not frames_dir or not prefix:
        yield None, "No frames provided. Upload a ZIP or images.", prog_html
        return

    orig_path = uploaded_audio_video.name if uploaded_audio_video else None
    cmd = build_ffmpeg_encode(frames_dir, prefix, float(fps or 30.0), fmt, include_audio, orig_path)

    # Add progress pipe and run in frames_dir for clean output pathing
    cmd = [cmd[0], "-progress", "pipe:2"] + cmd[1:]
    proc = subprocess.Popen(
        cmd, stderr=subprocess.PIPE, stdout=subprocess.DEVNULL,
        text=True, bufsize=1, cwd=frames_dir
    )
    
    # Approximate by counting files
    total_frames = len(list(Path(frames_dir).glob(f"{prefix}_*.jpg"))) + len(list(Path(frames_dir).glob(f"{prefix}_*.png")))
    current = 0
    errors = []   # ✅ collect errors while streaming
    
    while True:
        line = proc.stderr.readline()
        if not line and proc.poll() is not None:
            break
    
        if "frame=" in line:
            try:
                current = int(line.strip().split("=")[-1])
            except Exception:
                pass
            if total_frames > 0:
                pct = min(100.0, (current / total_frames) * 100.0)
                yield None, f"Encoding… {current}/{total_frames} frames", render_progress(pct, f"Encoding {pct:.0f}%")
            else:
                yield None, "Encoding…", render_progress(50.0, "Encoding…")
    
        # ✅ collect suspicious lines (errors, missing files, etc.)
        if "Error" in line or "No such file" in line:
            errors.append(line.strip())
    
    ret = proc.wait()
    out_file = Path(frames_dir) / ("output.mp4" if fmt in ("h264", "h265") else "output.webm")
    
    if ret != 0 or not out_file.exists():
        err = "\n".join(errors) or "Unknown ffmpeg error."
        yield None, f"Encoding failed.\n\n{err}", render_progress(0.0, "Failed")
        return


    yield str(out_file), f"Video created: {out_file.name}", render_progress(100.0, "Encoding complete")

def build_ui():
    with gr.Blocks(theme=gr.themes.Soft()) as demo:
        gr.HTML(render_logo_html(88))
        gr.Markdown("Re-encode a folder/ZIP of frames into a video. Optionally mix audio from a video/audio file.")

        uploaded_frames = gr.Files(label="Upload frames (ZIP or images)", type="filepath")
        uploaded_audio = gr.File(label="Optional: video/audio for audio track", file_types=[".mp4",".mov",".mkv",".webm",".mp3",".wav"], type="filepath")

        with gr.Row():
            fps = gr.Number(value=30.0, label="FPS")
            fmt = gr.Dropdown(["h264", "h265", "vp9"], value="h264", label="Video format")
            include_audio = gr.Checkbox(True, label="Include audio if available")

        btn_encode = gr.Button("Create Video", variant="primary")
        prog = gr.HTML(render_progress(0.0, "Idle"))
        video_player = gr.Video(label="Output video")
        details = gr.Markdown("")

        btn_encode.click(
            step3_encode,
            inputs=[uploaded_frames, uploaded_audio, fps, fmt, include_audio, prog],
            outputs=[video_player, details, prog],
        )

        if MISSING_MSG:
            gr.Markdown(f"<span style='color:#b45309'>{MISSING_MSG}</span>")

    return demo

if __name__ == "__main__":
    build_ui().queue().launch()