Spaces:
Sleeping
Sleeping
File size: 5,787 Bytes
9852ddc | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 | import random
import os
import tempfile
import subprocess
import gradio as gr
from daggr import GradioNode, Graph, FnNode
from daggr.state import get_daggr_files_dir
import yt_dlp
def download_youtube_video(video_url: str) -> str:
"""
Downloads a YouTube video and returns the path to the MP4 file.
Args:
video_url: The YouTube video URL
Returns:
Path to the downloaded MP4 file
"""
output_path = os.path.join(get_daggr_files_dir(), "%(title)s.%(ext)s")
ydl_opts = {
'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best',
'outtmpl': output_path,
'merge_output_format': 'mp4',
'quiet': True,
'no_warnings': True,
}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(video_url, download=True)
filename = ydl.prepare_filename(info)
if not filename.endswith('.mp4'):
base_name = os.path.splitext(filename)[0]
for ext in ['.mp4', '.mkv', '.webm']:
potential_file = base_name + ext
if os.path.exists(potential_file):
if ext != '.mp4':
mp4_file = base_name + '.mp4'
subprocess.run(['ffmpeg', '-i', potential_file, '-c', 'copy', mp4_file, '-y'],
check=True, capture_output=True)
os.remove(potential_file)
return mp4_file
return potential_file
return filename
except Exception as e:
raise Exception(f"Error downloading video: {str(e)}")
def convert_mp4_to_mp3(original_video: str) -> str:
"""
Converts an MP4 video file to MP3 audio file.
Args:
video_path: Path to the MP4 video file
Returns:
Path to the converted MP3 audio file
"""
if not os.path.exists(original_video):
raise FileNotFoundError(f"Video file not found: {original_video}")
base_name = os.path.splitext(original_video)[0]
mp3_path = base_name + '.mp3'
try:
subprocess.run(
[
'ffmpeg',
'-i', original_video,
'-vn',
'-acodec', 'libmp3lame',
'-ab', '192k',
'-ar', '44100',
'-y',
mp3_path
],
check=True,
capture_output=True
)
return mp3_path
except subprocess.CalledProcessError as e:
raise Exception(f"Error converting video to MP3: {str(e)}")
except FileNotFoundError:
raise Exception("ffmpeg not found. Please install ffmpeg to use this function.")
def replace_audio_video(audio_path: str, video_path: str) -> str:
"""
Combines an audio file and a video file into a single video file.
Args:
audio_path: Path to the audio file
video_path: Path to the video file
Returns:
Path to the combined video file
"""
if not os.path.exists(audio_path):
raise FileNotFoundError(f"Audio file not found: {audio_path}")
if not os.path.exists(video_path):
raise FileNotFoundError(f"Video file not found: {video_path}")
# Generate output filename based on video filename
base_name = os.path.splitext(os.path.basename(video_path))[0]
output_filename = f"{base_name}_with_new_audio.mp4"
output_path = os.path.join(get_daggr_files_dir(), output_filename)
try:
# Map video stream from video file, audio stream from audio file
# Use copy for video codec, encode audio to aac for compatibility
subprocess.run(
[
'ffmpeg',
'-i', video_path,
'-i', audio_path,
'-c:v', 'copy', # Copy video codec
'-c:a', 'aac', # Encode audio to AAC for compatibility
'-map', '0:v:0', # Map video from first input
'-map', '1:a:0', # Map audio from second input
'-shortest', # Finish encoding when shortest input ends
'-y', # Overwrite output file
output_path
],
check=True,
capture_output=True
)
return output_path
except subprocess.CalledProcessError as e:
raise Exception(f"Error combining audio and video: {str(e)}")
except FileNotFoundError:
raise Exception("ffmpeg not found. Please install ffmpeg to use this function.")
ytVideoDownload = FnNode(
download_youtube_video,
inputs={
"video_url": gr.Textbox(label="Video URL"),
},
outputs={
"original_video": gr.Video(label=" Original Video"),
},
)
audioConversion = FnNode(
convert_mp4_to_mp3,
inputs={
"original_video": ytVideoDownload.original_video,
},
outputs={
"original_audio": gr.Audio(label="Original Audio"),
},
)
audio_segmentation = GradioNode(
"r3gm/Audio_separator",
api_name="/sound_separate",
inputs={
"media_file": audioConversion.original_audio,
"stem": ['background']
},
outputs={
"commentary_removed": gr.Audio(label="Commentary Removed Audio"),
},
)
replaceAudioVideo = FnNode(
replace_audio_video,
inputs={
"audio_path": audio_segmentation.commentary_removed,
"video_path": ytVideoDownload.original_video,
},
outputs={
"final_video": gr.Video(label="No Commentary Video"),
},
)
graph = Graph(
name="Remove Sports Commentary",
nodes=[ytVideoDownload, audioConversion, audio_segmentation, replaceAudioVideo]
)
graph.launch()
|