File size: 7,430 Bytes
a4a12a5 f7bb210 a4a12a5 9075525 a4a12a5 9075525 a4a12a5 9075525 a4a12a5 9075525 a4a12a5 9075525 a4a12a5 9075525 a4a12a5 1f358f3 9075525 a4a12a5 9075525 a4a12a5 9075525 1f358f3 a4a12a5 9075525 a4a12a5 fb45b53 a4a12a5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
# app.py — Re-encode Frames to Video (FFmpeg only)
import os, zipfile, tempfile, subprocess, base64
from pathlib import Path
from typing import List, Optional, Tuple
import gradio as gr
def try_load_logo_b64() -> str:
try:
with open("bifrost_logo.png", "rb") as f:
import base64
return base64.b64encode(f.read()).decode("utf-8")
except Exception:
return ""
LOGO_B64 = try_load_logo_b64()
def render_logo_html(px: int = 96) -> str:
img = f'<img src="data:image/png;base64,{LOGO_B64}" style="height:{px}px;width:auto;" />' if LOGO_B64 else ""
return f"""
<div style="display:flex;align-items:center;gap:16px;">
{img}
<div>
<div style="font-size:1.6rem;font-weight:800;">Valknut · Re-encode Images-to-Video</div>
<div style="opacity:0.8;">Valknut binds frames and sound, encoding into one unified whole</div>
</div>
</div>
<hr>
"""
def _which(name: str) -> Optional[str]:
from shutil import which
return which(name)
FFMPEG = _which("ffmpeg")
FFPROBE = _which("ffprobe")
MISSING_MSG = "" if (FFMPEG and FFPROBE) else (
"⚠️ FFmpeg/FFprobe not found. Add a 'packages.txt' with:\n"
"ffmpeg\nlibsm6\nlibxext6\nand restart the Space."
)
def render_progress(pct: float, label: str = "") -> str:
pct = max(0.0, min(100.0, pct))
return f'''<div style="width:100%;border:1px solid #ddd;border-radius:8px;overflow:hidden;height:18px;">
<div style="height:100%;width:{pct:.1f}%;background:#3b82f6;"></div></div>
<div style="font-size:12px;opacity:.8;margin-top:4px;">{label} {pct:.1f}%</div>'''
def prepare_frames_from_upload(files: List[gr.File] | None, prefix: str = "enc") -> Tuple[Optional[str], Optional[str]]:
if not files:
return None, None
work = Path(tempfile.mkdtemp(prefix="enc_"))
frames_dir = work / "frames"; frames_dir.mkdir(parents=True, exist_ok=True)
detected_prefix = None
if len(files) == 1 and Path(files[0].name).suffix.lower() == ".zip":
with zipfile.ZipFile(files[0].name, "r") as zf:
zf.extractall(frames_dir)
imgs = sorted(frames_dir.glob("*.jpg")) + sorted(frames_dir.glob("*.png"))
if imgs:
detected_prefix = Path(imgs[0]).stem.split("_")[0]
return str(frames_dir), detected_prefix or prefix
counter = 1
for f in files:
src = Path(f.name)
if src.suffix.lower() not in [".jpg", ".jpeg", ".png"]:
continue
dst = frames_dir / f"{prefix}_{counter:05d}{src.suffix.lower()}"
src.replace(dst) if src.exists() else None
counter += 1
return str(frames_dir), prefix
def build_ffmpeg_encode(frames_dir: str, prefix: str, fps: float, fmt: str,
include_audio: bool, orig_video: str | None) -> list[str]:
jpgs = sorted(Path(frames_dir).glob(f"{prefix}_*.jpg"))
pngs = sorted(Path(frames_dir).glob(f"{prefix}_*.png"))
imgs = jpgs if jpgs else pngs
if not imgs:
return []
first_frame = imgs[0].name
pattern = str(imgs[0].with_name(f"{prefix}_%05d{imgs[0].suffix}"))
start_num = int(Path(first_frame).stem.split("_")[-1])
args = [FFMPEG, "-y", "-start_number", str(start_num),
"-framerate", f"{fps:.6f}", "-i", pattern]
if include_audio and orig_video:
args += ["-i", orig_video, "-map", "0:v:0", "-map", "1:a:0", "-shortest"]
if fmt == "h265":
vcodec = ["-c:v", "libx265"]
elif fmt == "vp9":
vcodec = ["-c:v", "libvpx-vp9"]
else:
vcodec = ["-c:v", "libx264"]
out_name = "output.mp4" if fmt in ("h264", "h265") else "output.webm"
return args + vcodec + ["-pix_fmt", "yuv420p", "-crf", "18", out_name]
def step3_encode(
uploaded_frames: List[gr.File] | None,
uploaded_audio_video: gr.File | None,
fps: float,
fmt: str,
include_audio: bool,
prog_html: str,
):
if not (FFMPEG and FFPROBE):
yield None, "FFmpeg/FFprobe missing. See note below.", prog_html
return
frames_dir, prefix = prepare_frames_from_upload(uploaded_frames, "enc")
if not frames_dir or not prefix:
yield None, "No frames provided. Upload a ZIP or images.", prog_html
return
orig_path = uploaded_audio_video.name if uploaded_audio_video else None
cmd = build_ffmpeg_encode(frames_dir, prefix, float(fps or 30.0), fmt, include_audio, orig_path)
# Add progress pipe and run in frames_dir for clean output pathing
cmd = [cmd[0], "-progress", "pipe:2"] + cmd[1:]
proc = subprocess.Popen(
cmd, stderr=subprocess.PIPE, stdout=subprocess.DEVNULL,
text=True, bufsize=1, cwd=frames_dir
)
# Approximate by counting files
total_frames = len(list(Path(frames_dir).glob(f"{prefix}_*.jpg"))) + len(list(Path(frames_dir).glob(f"{prefix}_*.png")))
current = 0
errors = [] # ✅ collect errors while streaming
while True:
line = proc.stderr.readline()
if not line and proc.poll() is not None:
break
if "frame=" in line:
try:
current = int(line.strip().split("=")[-1])
except Exception:
pass
if total_frames > 0:
pct = min(100.0, (current / total_frames) * 100.0)
yield None, f"Encoding… {current}/{total_frames} frames", render_progress(pct, f"Encoding {pct:.0f}%")
else:
yield None, "Encoding…", render_progress(50.0, "Encoding…")
# ✅ collect suspicious lines (errors, missing files, etc.)
if "Error" in line or "No such file" in line:
errors.append(line.strip())
ret = proc.wait()
out_file = Path(frames_dir) / ("output.mp4" if fmt in ("h264", "h265") else "output.webm")
if ret != 0 or not out_file.exists():
err = "\n".join(errors) or "Unknown ffmpeg error."
yield None, f"Encoding failed.\n\n{err}", render_progress(0.0, "Failed")
return
yield str(out_file), f"Video created: {out_file.name}", render_progress(100.0, "Encoding complete")
def build_ui():
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.HTML(render_logo_html(88))
gr.Markdown("Re-encode a folder/ZIP of frames into a video. Optionally mix audio from a video/audio file.")
uploaded_frames = gr.Files(label="Upload frames (ZIP or images)", type="filepath")
uploaded_audio = gr.File(label="Optional: video/audio for audio track", file_types=[".mp4",".mov",".mkv",".webm",".mp3",".wav"], type="filepath")
with gr.Row():
fps = gr.Number(value=30.0, label="FPS")
fmt = gr.Dropdown(["h264", "h265", "vp9"], value="h264", label="Video format")
include_audio = gr.Checkbox(True, label="Include audio if available")
btn_encode = gr.Button("Create Video", variant="primary")
prog = gr.HTML(render_progress(0.0, "Idle"))
video_player = gr.Video(label="Output video")
details = gr.Markdown("")
btn_encode.click(
step3_encode,
inputs=[uploaded_frames, uploaded_audio, fps, fmt, include_audio, prog],
outputs=[video_player, details, prog],
)
if MISSING_MSG:
gr.Markdown(f"<span style='color:#b45309'>{MISSING_MSG}</span>")
return demo
if __name__ == "__main__":
build_ui().queue().launch()
|