File size: 4,876 Bytes
58ff7c4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import hashlib
import os
import gradio as gr
import math
import random
import subprocess
import gdown
import requests
from concurrent.futures import ThreadPoolExecutor


def download_file(url: str, output: str = None) -> str:
    if 'drive.google.com' in url:
        gdown.download(url, output=output)
        return output
    
    res = requests.get(url, stream=True)
    res.raise_for_status()
    
    with open(output, 'wb') as f:
        for chunk in res.iter_content(chunk_size=8192):
            f.write(chunk)
            
    return output

def md5(s: str) -> str:
    return hashlib.md5(s.encode()).hexdigest()

def dur(f):
    out = subprocess.check_output((
        f'ffprobe -v error -show_entries format=duration '
        f'-of default=noprint_wrappers=1:nokey=1 "{f}"'
    ), shell=True)
    return float(out.strip())


def concat_video(vs: list[str], secs: float, dir: str, shuffle=True):
    if shuffle:
        random.shuffle(vs)

    vdur = sum(dur(v) for v in vs)
    rep = math.ceil(secs / vdur)
    vs = vs * rep
    output = f"{dir}/concat.mp4"
    videos_path = f"{dir}/videos.txt"

    with open(videos_path, "w") as f:
        f.write("\n".join([f"file '{v}'" for v in vs]))

    tmp_concat = f"{dir}/tmp_concat.mp4"

    subprocess.run((
        f'ffmpeg -y -v error -f concat -safe 0 -i "{videos_path}" -c copy "{tmp_concat}"'
    ), shell=True, check=True)

    subprocess.run((
        f'ffmpeg -y -v error -i "{tmp_concat}" -t {secs:.3f} -c copy "{output}"'
    ), shell=True, check=True)
    
    return output


def concat_audio(video_file, audio_file, sub_file, output):
    subprocess.run((
        f'ffmpeg -y -i "{video_file}" -i "{audio_file}" '
        f'-vf "ass={sub_file}" '
        f'-map 0:v:0 -map 1:a:0 -c:v libx264 -preset veryfast -crf 23 -c:a copy "{output}"'
    ), check=True, shell=True)



def render_video(clip_dir: str, audio_link: str, sub_link: str) -> str:
    uid = md5(audio_link)
    videos =[]
    if 'drive.google.com' in clip_dir:
        drive_files = gdown.download_folder(clip_dir, skip_download=True)
        drive_ids = [f.id for f in drive_files]
        with ThreadPoolExecutor(max_workers=5) as executor:
            futures = []
            for id in drive_ids:
                if os.path.exists(f'/tmp/{md5(id)}.mp4'):
                    videos.append(f'/tmp/{md5(id)}.mp4')

                futures.append(executor.submit(lambda id=id: gdown.download(id=id, output=f'/tmp/{md5(id)}.mp4')))

            videos = [f.result() for f in futures if f.result()]
            
            print('clips', videos)
            
    if not videos:
        raise gr.Error("No video clips found.")
    
    
    audio_file = download_file(audio_link, output=f'/tmp/{uid}.mp3')

    if not audio_file:
        raise gr.Error("Failed to download audio file.")

    sub_file = download_file(sub_link, output=f'/tmp/{uid}.txt')

    if not sub_file:
        raise gr.Error("Failed to download subtitle file.")

    os.makedirs(f"/tmp/{uid}", exist_ok=True)
    video_file = concat_video(videos, secs=dur(audio_file), dir=f"/tmp/{uid}")

    output = f"/tmp/{uid}.mp4"
    concat_audio(video_file, audio_file, sub_file, output=output)

    return output


with gr.Blocks(title="Render video from clips and audio", analytics_enabled=True) as demo:
    with gr.Row():
        with gr.Column(scale=3):
            gr.Markdown("# Render video")
        with gr.Column(scale=1):
            btn_submit = gr.Button("RUN", variant="primary")

    btn_submit.click(fn=render_video, inputs=[
        gr.Textbox(label="Clips", max_lines=1, placeholder="https://drive.google.com/drive/folders/...", info="Link to the folder containing video clips", show_copy_button=True),
        gr.Textbox(label="Audio", max_lines=1, placeholder="https://...", info="Link to the audio file", show_copy_button=True),
        gr.Textbox(label="Subtitle", max_lines=1, placeholder="https://...", info="Link to the subtitle file", show_copy_button=True),
    ], outputs=[
        gr.Video(label="Output video", show_download_button=True, height=240 )
    ])

demo = gr.Interface(
    title="Render video from clips and audio",
    fn=render_video,
    clear_btn=None,
    inputs=[
        gr.Textbox(label="Clips", max_lines=1, placeholder="https://drive.google.com/drive/folders/...", info="Link to the folder containing video clips", show_copy_button=True),
        gr.Textbox(label="Audio", max_lines=1, placeholder="https://...", info="Link to the audio file", show_copy_button=True),
        gr.Textbox(label="Subtitle", max_lines=1, placeholder="https://...", info="Link to the subtitle file", show_copy_button=True),
    ],
    outputs=[
        gr.Video(label="Output video", show_download_button=True, height=240 )
    ],
    allow_flagging="never",
    api_name="render",
)

if __name__ == "__main__":
    demo.launch(show_error=True)