import hashlib import os import gradio as gr import math import random import subprocess import gdown import requests from concurrent.futures import ThreadPoolExecutor def download_file(url: str, output: str = None) -> str: if 'drive.google.com' in url: gdown.download(url, output=output) return output res = requests.get(url, stream=True) res.raise_for_status() with open(output, 'wb') as f: for chunk in res.iter_content(chunk_size=8192): f.write(chunk) return output def md5(s: str) -> str: return hashlib.md5(s.encode()).hexdigest() def dur(f): out = subprocess.check_output(( f'ffprobe -v error -show_entries format=duration ' f'-of default=noprint_wrappers=1:nokey=1 "{f}"' ), shell=True) return float(out.strip()) def concat_video(vs: list[str], secs: float, dir: str, shuffle=True): if shuffle: random.shuffle(vs) vdur = sum(dur(v) for v in vs) rep = math.ceil(secs / vdur) vs = vs * rep output = f"{dir}/concat.mp4" videos_path = f"{dir}/videos.txt" with open(videos_path, "w") as f: f.write("\n".join([f"file '{v}'" for v in vs])) tmp_concat = f"{dir}/tmp_concat.mp4" subprocess.run(( f'ffmpeg -y -v error -f concat -safe 0 -i "{videos_path}" -c copy "{tmp_concat}"' ), shell=True, check=True) subprocess.run(( f'ffmpeg -y -v error -i "{tmp_concat}" -t {secs:.3f} -c copy "{output}"' ), shell=True, check=True) return output def concat_audio(video_file, audio_file, sub_file, output): subprocess.run(( f'ffmpeg -y -i "{video_file}" -i "{audio_file}" ' f'-vf "ass={sub_file}" ' f'-map 0:v:0 -map 1:a:0 -c:v libx264 -preset veryfast -crf 23 -c:a copy "{output}"' ), check=True, shell=True) def render_video(clip_dir: str, audio_link: str, sub_link: str) -> str: uid = md5(audio_link) videos =[] if 'drive.google.com' in clip_dir: drive_files = gdown.download_folder(clip_dir, skip_download=True) drive_ids = [f.id for f in drive_files] with ThreadPoolExecutor(max_workers=5) as executor: futures = [] for id in drive_ids: if os.path.exists(f'/tmp/{md5(id)}.mp4'): videos.append(f'/tmp/{md5(id)}.mp4') futures.append(executor.submit(lambda id=id: gdown.download(id=id, output=f'/tmp/{md5(id)}.mp4'))) videos = [f.result() for f in futures if f.result()] print('clips', videos) if not videos: raise gr.Error("No video clips found.") audio_file = download_file(audio_link, output=f'/tmp/{uid}.mp3') if not audio_file: raise gr.Error("Failed to download audio file.") sub_file = download_file(sub_link, output=f'/tmp/{uid}.txt') if not sub_file: raise gr.Error("Failed to download subtitle file.") os.makedirs(f"/tmp/{uid}", exist_ok=True) video_file = concat_video(videos, secs=dur(audio_file), dir=f"/tmp/{uid}") output = f"/tmp/{uid}.mp4" concat_audio(video_file, audio_file, sub_file, output=output) return output with gr.Blocks(title="Render video from clips and audio", analytics_enabled=True) as demo: with gr.Row(): with gr.Column(scale=3): gr.Markdown("# Render video") with gr.Column(scale=1): btn_submit = gr.Button("RUN", variant="primary") btn_submit.click(fn=render_video, inputs=[ gr.Textbox(label="Clips", max_lines=1, placeholder="https://drive.google.com/drive/folders/...", info="Link to the folder containing video clips", show_copy_button=True), gr.Textbox(label="Audio", max_lines=1, placeholder="https://...", info="Link to the audio file", show_copy_button=True), gr.Textbox(label="Subtitle", max_lines=1, placeholder="https://...", info="Link to the subtitle file", show_copy_button=True), ], outputs=[ gr.Video(label="Output video", show_download_button=True, height=240 ) ]) demo = gr.Interface( title="Render video from clips and audio", fn=render_video, clear_btn=None, inputs=[ gr.Textbox(label="Clips", max_lines=1, placeholder="https://drive.google.com/drive/folders/...", info="Link to the folder containing video clips", show_copy_button=True), gr.Textbox(label="Audio", max_lines=1, placeholder="https://...", info="Link to the audio file", show_copy_button=True), gr.Textbox(label="Subtitle", max_lines=1, placeholder="https://...", info="Link to the subtitle file", show_copy_button=True), ], outputs=[ gr.Video(label="Output video", show_download_button=True, height=240 ) ], allow_flagging="never", api_name="render", ) if __name__ == "__main__": demo.launch(show_error=True)