tool / pkg /render_run.py
Vo Hoang Minh
u
e2360c1
import hashlib
import os
import gradio as gr
import math
import random
import subprocess
import gdown
import requests
from concurrent.futures import ThreadPoolExecutor
def download_file(url: str, output: str = None) -> str:
if 'drive.google.com' in url:
gdown.download(url, output=output)
return output
res = requests.get(url, stream=True)
res.raise_for_status()
with open(output, 'wb') as f:
for chunk in res.iter_content(chunk_size=8192):
f.write(chunk)
return output
def md5(s: str) -> str:
return hashlib.md5(s.encode()).hexdigest()
def dur(f):
out = subprocess.check_output((
f'ffprobe -v error -show_entries format=duration '
f'-of default=noprint_wrappers=1:nokey=1 "{f}"'
), shell=True)
return float(out.strip())
def concat_video(vs: list[str], secs: float, dir: str, shuffle=True):
if shuffle:
random.shuffle(vs)
vdur = sum(dur(v) for v in vs)
rep = math.ceil(secs / vdur)
vs = vs * rep
output = f"{dir}/concat.mp4"
videos_path = f"{dir}/videos.txt"
with open(videos_path, "w") as f:
f.write("\n".join([f"file '{v}'" for v in vs]))
tmp_concat = f"{dir}/tmp_concat.mp4"
subprocess.run((
f'ffmpeg -y -v error -f concat -safe 0 -i "{videos_path}" -c copy "{tmp_concat}"'
), shell=True, check=True)
subprocess.run((
f'ffmpeg -y -v error -i "{tmp_concat}" -t {secs:.3f} -c copy "{output}"'
), shell=True, check=True)
return output
def concat_audio(video_file, audio_file, sub_file, output):
subprocess.run((
f'ffmpeg -y -i "{video_file}" -i "{audio_file}" '
f'-vf "ass={sub_file}" '
f'-map 0:v:0 -map 1:a:0 -c:v libx264 -preset veryfast -crf 23 -c:a copy "{output}"'
), check=True, shell=True)
def render_video(clip_dir: str, audio_link: str, sub_link: str) -> str:
uid = md5(audio_link)
videos =[]
if 'drive.google.com' in clip_dir:
drive_files = gdown.download_folder(clip_dir, skip_download=True)
drive_ids = [f.id for f in drive_files]
with ThreadPoolExecutor(max_workers=5) as executor:
futures = []
for id in drive_ids:
if os.path.exists(f'/tmp/{md5(id)}.mp4'):
videos.append(f'/tmp/{md5(id)}.mp4')
futures.append(executor.submit(lambda id=id: gdown.download(id=id, output=f'/tmp/{md5(id)}.mp4')))
videos = [f.result() for f in futures if f.result()]
print('clips', videos)
if not videos:
raise gr.Error("No video clips found.")
audio_file = download_file(audio_link, output=f'/tmp/{uid}.mp3')
if not audio_file:
raise gr.Error("Failed to download audio file.")
sub_file = download_file(sub_link, output=f'/tmp/{uid}.txt')
if not sub_file:
raise gr.Error("Failed to download subtitle file.")
os.makedirs(f"/tmp/{uid}", exist_ok=True)
video_file = concat_video(videos, secs=dur(audio_file), dir=f"/tmp/{uid}")
output = f"/tmp/{uid}.mp4"
concat_audio(video_file, audio_file, sub_file, output=output)
return output
with gr.Blocks(title="Render video from clips and audio", analytics_enabled=True) as demo:
with gr.Row():
with gr.Column(scale=3):
gr.Markdown("# Render video")
with gr.Column(scale=1):
btn_submit = gr.Button("RUN", variant="primary")
btn_submit.click(fn=render_video, inputs=[
gr.Textbox(label="Clips", max_lines=1, placeholder="https://drive.google.com/drive/folders/...", info="Link to the folder containing video clips", show_copy_button=True),
gr.Textbox(label="Audio", max_lines=1, placeholder="https://...", info="Link to the audio file", show_copy_button=True),
gr.Textbox(label="Subtitle", max_lines=1, placeholder="https://...", info="Link to the subtitle file", show_copy_button=True),
], outputs=[
gr.Video(label="Output video", show_download_button=True, height=240 )
])
demo = gr.Interface(
title="Render video from clips and audio",
fn=render_video,
clear_btn=None,
inputs=[
gr.Textbox(label="Clips", max_lines=1, placeholder="https://drive.google.com/drive/folders/...", info="Link to the folder containing video clips", show_copy_button=True),
gr.Textbox(label="Audio", max_lines=1, placeholder="https://...", info="Link to the audio file", show_copy_button=True),
gr.Textbox(label="Subtitle", max_lines=1, placeholder="https://...", info="Link to the subtitle file", show_copy_button=True),
],
outputs=[
gr.Video(label="Output video", show_download_button=True, height=240 )
],
allow_flagging="never",
api_name="render",
)
if __name__ == "__main__":
demo.launch(show_error=True)