| import hashlib |
| import os |
| import gradio as gr |
| import math |
| import random |
| import subprocess |
| import gdown |
| import requests |
| from concurrent.futures import ThreadPoolExecutor |
|
|
|
|
| def download_file(url: str, output: str = None) -> str: |
| if 'drive.google.com' in url: |
| gdown.download(url, output=output) |
| return output |
| |
| res = requests.get(url, stream=True) |
| res.raise_for_status() |
| |
| with open(output, 'wb') as f: |
| for chunk in res.iter_content(chunk_size=8192): |
| f.write(chunk) |
| |
| return output |
|
|
| def md5(s: str) -> str: |
| return hashlib.md5(s.encode()).hexdigest() |
|
|
| def dur(f): |
| out = subprocess.check_output(( |
| f'ffprobe -v error -show_entries format=duration ' |
| f'-of default=noprint_wrappers=1:nokey=1 "{f}"' |
| ), shell=True) |
| return float(out.strip()) |
|
|
|
|
| def concat_video(vs: list[str], secs: float, dir: str, shuffle=True): |
| if shuffle: |
| random.shuffle(vs) |
|
|
| vdur = sum(dur(v) for v in vs) |
| rep = math.ceil(secs / vdur) |
| vs = vs * rep |
| output = f"{dir}/concat.mp4" |
| videos_path = f"{dir}/videos.txt" |
|
|
| with open(videos_path, "w") as f: |
| f.write("\n".join([f"file '{v}'" for v in vs])) |
|
|
| tmp_concat = f"{dir}/tmp_concat.mp4" |
|
|
| subprocess.run(( |
| f'ffmpeg -y -v error -f concat -safe 0 -i "{videos_path}" -c copy "{tmp_concat}"' |
| ), shell=True, check=True) |
|
|
| subprocess.run(( |
| f'ffmpeg -y -v error -i "{tmp_concat}" -t {secs:.3f} -c copy "{output}"' |
| ), shell=True, check=True) |
| |
| return output |
|
|
|
|
| def concat_audio(video_file, audio_file, sub_file, output): |
| subprocess.run(( |
| f'ffmpeg -y -i "{video_file}" -i "{audio_file}" ' |
| f'-vf "ass={sub_file}" ' |
| f'-map 0:v:0 -map 1:a:0 -c:v libx264 -preset veryfast -crf 23 -c:a copy "{output}"' |
| ), check=True, shell=True) |
|
|
|
|
|
|
| def render_video(clip_dir: str, audio_link: str, sub_link: str) -> str: |
| uid = md5(audio_link) |
| videos =[] |
| if 'drive.google.com' in clip_dir: |
| drive_files = gdown.download_folder(clip_dir, skip_download=True) |
| drive_ids = [f.id for f in drive_files] |
| with ThreadPoolExecutor(max_workers=5) as executor: |
| futures = [] |
| for id in drive_ids: |
| if os.path.exists(f'/tmp/{md5(id)}.mp4'): |
| videos.append(f'/tmp/{md5(id)}.mp4') |
|
|
| futures.append(executor.submit(lambda id=id: gdown.download(id=id, output=f'/tmp/{md5(id)}.mp4'))) |
|
|
| videos = [f.result() for f in futures if f.result()] |
| |
| print('clips', videos) |
| |
| if not videos: |
| raise gr.Error("No video clips found.") |
| |
| |
| audio_file = download_file(audio_link, output=f'/tmp/{uid}.mp3') |
|
|
| if not audio_file: |
| raise gr.Error("Failed to download audio file.") |
|
|
| sub_file = download_file(sub_link, output=f'/tmp/{uid}.txt') |
|
|
| if not sub_file: |
| raise gr.Error("Failed to download subtitle file.") |
|
|
| os.makedirs(f"/tmp/{uid}", exist_ok=True) |
| video_file = concat_video(videos, secs=dur(audio_file), dir=f"/tmp/{uid}") |
|
|
| output = f"/tmp/{uid}.mp4" |
| concat_audio(video_file, audio_file, sub_file, output=output) |
|
|
| return output |
|
|
|
|
| with gr.Blocks(title="Render video from clips and audio", analytics_enabled=True) as demo: |
| with gr.Row(): |
| with gr.Column(scale=3): |
| gr.Markdown("# Render video") |
| with gr.Column(scale=1): |
| btn_submit = gr.Button("RUN", variant="primary") |
|
|
| btn_submit.click(fn=render_video, inputs=[ |
| gr.Textbox(label="Clips", max_lines=1, placeholder="https://drive.google.com/drive/folders/...", info="Link to the folder containing video clips", show_copy_button=True), |
| gr.Textbox(label="Audio", max_lines=1, placeholder="https://...", info="Link to the audio file", show_copy_button=True), |
| gr.Textbox(label="Subtitle", max_lines=1, placeholder="https://...", info="Link to the subtitle file", show_copy_button=True), |
| ], outputs=[ |
| gr.Video(label="Output video", show_download_button=True, height=240 ) |
| ]) |
|
|
| demo = gr.Interface( |
| title="Render video from clips and audio", |
| fn=render_video, |
| clear_btn=None, |
| inputs=[ |
| gr.Textbox(label="Clips", max_lines=1, placeholder="https://drive.google.com/drive/folders/...", info="Link to the folder containing video clips", show_copy_button=True), |
| gr.Textbox(label="Audio", max_lines=1, placeholder="https://...", info="Link to the audio file", show_copy_button=True), |
| gr.Textbox(label="Subtitle", max_lines=1, placeholder="https://...", info="Link to the subtitle file", show_copy_button=True), |
| ], |
| outputs=[ |
| gr.Video(label="Output video", show_download_button=True, height=240 ) |
| ], |
| allow_flagging="never", |
| api_name="render", |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch(show_error=True) |
|
|