Spaces:
Sleeping
Sleeping
| """ | |
| ffmpeg utility wrappers — all subprocess calls go through here. | |
| Assumes ffmpeg is available on PATH. | |
| """ | |
| import asyncio | |
| import subprocess | |
| import os | |
| from pathlib import Path | |
| from typing import List, Optional | |
| async def run_ffmpeg(args: List[str], timeout: int = 120) -> str: | |
| """Run an ffmpeg command in a thread pool (Windows-safe).""" | |
| cmd = ["ffmpeg", "-y"] + args | |
| loop = asyncio.get_event_loop() | |
| def _run(): | |
| return subprocess.run( | |
| cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| timeout=timeout, | |
| check=True | |
| ) | |
| try: | |
| result = await loop.run_in_executor(None, _run) | |
| return result.stderr | |
| except subprocess.CalledProcessError as e: | |
| raise RuntimeError(f"ffmpeg error:\n{e.stderr}") | |
| except subprocess.TimeoutExpired: | |
| raise RuntimeError(f"ffmpeg timed out after {timeout}s") | |
| async def run_ffprobe(args: List[str], timeout: int = 30) -> str: | |
| """Run an ffprobe command in a thread pool (Windows-safe).""" | |
| cmd = ["ffprobe"] + args | |
| loop = asyncio.get_event_loop() | |
| def _run(): | |
| return subprocess.run( | |
| cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| timeout=timeout, | |
| check=True | |
| ) | |
| try: | |
| result = await loop.run_in_executor(None, _run) | |
| return result.stdout | |
| except subprocess.CalledProcessError as e: | |
| raise RuntimeError(f"ffprobe error:\n{e.stderr}") | |
| async def ken_burns_effect( | |
| image_path: str, | |
| output_path: str, | |
| duration: float = 4.0, | |
| zoom_direction: str = "in", | |
| resolution: str = "768x432", | |
| ) -> None: | |
| """ | |
| Apply Ken Burns (zoom/pan) effect to a single image using ffmpeg. | |
| Produces a short MP4 clip. | |
| """ | |
| w, h = resolution.split("x") | |
| fps = 25 | |
| total_frames = int(duration * fps) | |
| if zoom_direction == "in": | |
| zoom_expr = f"'min(zoom+0.0015,1.5)'" | |
| x_expr = "iw/2-(iw/zoom/2)" | |
| y_expr = "ih/2-(ih/zoom/2)" | |
| else: # out | |
| zoom_expr = f"'if(lte(zoom,1.0),1.5,zoom-0.0015)'" | |
| x_expr = "iw/2-(iw/zoom/2)" | |
| y_expr = "ih/2-(ih/zoom/2)" | |
| vf = ( | |
| f"scale=8000:-1," | |
| f"zoompan=z={zoom_expr}:x={x_expr}:y={y_expr}" | |
| f":d={total_frames}:s={resolution}:fps={fps}," | |
| f"setsar=1" | |
| ) | |
| await run_ffmpeg([ | |
| "-loop", "1", | |
| "-i", image_path, | |
| "-vf", vf, | |
| "-t", str(duration), | |
| "-c:v", "libx264", | |
| "-preset", "fast", | |
| "-crf", "23", | |
| "-pix_fmt", "yuv420p", | |
| output_path, | |
| ]) | |
| async def concat_clips(clip_paths: List[str], output_path: str) -> None: | |
| """Concatenate video clips using ffmpeg concat demuxer.""" | |
| list_file = output_path + ".txt" | |
| with open(list_file, "w") as f: | |
| for p in clip_paths: | |
| abs_path = os.path.abspath(p).replace("\\", "/") | |
| f.write(f"file '{abs_path}'\n") | |
| try: | |
| await run_ffmpeg([ | |
| "-f", "concat", | |
| "-safe", "0", | |
| "-i", list_file, | |
| "-c", "copy", | |
| output_path, | |
| ]) | |
| finally: | |
| if os.path.exists(list_file): | |
| os.remove(list_file) | |
| async def merge_audio_video( | |
| video_path: str, | |
| audio_path: str, | |
| output_path: str, | |
| aspect_ratio: str = "16:9", | |
| ) -> None: | |
| """Merge a video track with an audio track; loop audio to match video.""" | |
| vf_map = {"16:9": "1280x720", "9:16": "720x1280"} | |
| resolution = vf_map.get(aspect_ratio, "1280x720") | |
| # We loop the audio (-stream_loop -1) so it covers the whole video. | |
| # We use a simpler filter chain first, then try loudnorm. | |
| # If loudnorm fails (common on very short/silent audio), we fall back to simple merge. | |
| try: | |
| await run_ffmpeg([ | |
| "-i", video_path, | |
| "-stream_loop", "-1", | |
| "-i", audio_path, | |
| "-vf", f"scale={resolution},setsar=1", | |
| "-af", "loudnorm", | |
| "-shortest", | |
| "-c:v", "libx264", | |
| "-preset", "fast", | |
| "-crf", "22", | |
| "-c:a", "aac", | |
| "-b:a", "128k", | |
| "-movflags", "+faststart", | |
| output_path, | |
| ]) | |
| except Exception as e: | |
| # Fallback without loudnorm | |
| if os.path.exists(output_path): | |
| os.remove(output_path) | |
| await run_ffmpeg([ | |
| "-i", video_path, | |
| "-stream_loop", "-1", | |
| "-i", audio_path, | |
| "-vf", f"scale={resolution},setsar=1", | |
| "-af", "volume=1.0", # Safe fallback | |
| "-shortest", | |
| "-c:v", "libx264", | |
| "-preset", "fast", | |
| "-crf", "22", | |
| "-c:a", "aac", | |
| "-b:a", "128k", | |
| "-movflags", "+faststart", | |
| output_path, | |
| ]) | |
| async def image_to_silent_clip(image_path: str, output_path: str, duration: float = 4.0) -> None: | |
| """Simple still-image clip (no motion) as ultimate fallback.""" | |
| await run_ffmpeg([ | |
| "-loop", "1", | |
| "-i", image_path, | |
| "-t", str(duration), | |
| "-c:v", "libx264", | |
| "-preset", "fast", | |
| "-crf", "23", | |
| "-pix_fmt", "yuv420p", | |
| "-vf", "scale=768:432,setsar=1", | |
| output_path, | |
| ]) | |