Spaces:
Running
Running
| import random | |
| import os | |
| import tempfile | |
| import subprocess | |
| import gradio as gr | |
| from daggr import GradioNode, Graph, FnNode | |
| from daggr.state import get_daggr_files_dir | |
| import yt_dlp | |
| def download_youtube_video(video_url: str) -> str: | |
| """ | |
| Downloads a YouTube video and returns the path to the MP4 file. | |
| Args: | |
| video_url: The YouTube video URL | |
| Returns: | |
| Path to the downloaded MP4 file | |
| """ | |
| output_path = os.path.join(get_daggr_files_dir(), "%(title)s.%(ext)s") | |
| ydl_opts = { | |
| 'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best', | |
| 'outtmpl': output_path, | |
| 'merge_output_format': 'mp4', | |
| 'quiet': True, | |
| 'no_warnings': True, | |
| } | |
| try: | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(video_url, download=True) | |
| filename = ydl.prepare_filename(info) | |
| if not filename.endswith('.mp4'): | |
| base_name = os.path.splitext(filename)[0] | |
| for ext in ['.mp4', '.mkv', '.webm']: | |
| potential_file = base_name + ext | |
| if os.path.exists(potential_file): | |
| if ext != '.mp4': | |
| mp4_file = base_name + '.mp4' | |
| subprocess.run(['ffmpeg', '-i', potential_file, '-c', 'copy', mp4_file, '-y'], | |
| check=True, capture_output=True) | |
| os.remove(potential_file) | |
| return mp4_file | |
| return potential_file | |
| return filename | |
| except Exception as e: | |
| raise Exception(f"Error downloading video: {str(e)}") | |
| def convert_mp4_to_mp3(original_video: str) -> str: | |
| """ | |
| Converts an MP4 video file to MP3 audio file. | |
| Args: | |
| video_path: Path to the MP4 video file | |
| Returns: | |
| Path to the converted MP3 audio file | |
| """ | |
| if not os.path.exists(original_video): | |
| raise FileNotFoundError(f"Video file not found: {original_video}") | |
| base_name = os.path.splitext(original_video)[0] | |
| mp3_path = base_name + '.mp3' | |
| try: | |
| subprocess.run( | |
| [ | |
| 'ffmpeg', | |
| '-i', original_video, | |
| '-vn', | |
| '-acodec', 'libmp3lame', | |
| '-ab', '192k', | |
| '-ar', '44100', | |
| '-y', | |
| mp3_path | |
| ], | |
| check=True, | |
| capture_output=True | |
| ) | |
| return mp3_path | |
| except subprocess.CalledProcessError as e: | |
| raise Exception(f"Error converting video to MP3: {str(e)}") | |
| except FileNotFoundError: | |
| raise Exception("ffmpeg not found. Please install ffmpeg to use this function.") | |
| def replace_audio_video(audio_path: str, video_path: str) -> str: | |
| """ | |
| Combines an audio file and a video file into a single video file. | |
| Args: | |
| audio_path: Path to the audio file | |
| video_path: Path to the video file | |
| Returns: | |
| Path to the combined video file | |
| """ | |
| if not os.path.exists(audio_path): | |
| raise FileNotFoundError(f"Audio file not found: {audio_path}") | |
| if not os.path.exists(video_path): | |
| raise FileNotFoundError(f"Video file not found: {video_path}") | |
| # Generate output filename based on video filename | |
| base_name = os.path.splitext(os.path.basename(video_path))[0] | |
| output_filename = f"{base_name}_with_new_audio.mp4" | |
| output_path = os.path.join(get_daggr_files_dir(), output_filename) | |
| try: | |
| # Map video stream from video file, audio stream from audio file | |
| # Use copy for video codec, encode audio to aac for compatibility | |
| subprocess.run( | |
| [ | |
| 'ffmpeg', | |
| '-i', video_path, | |
| '-i', audio_path, | |
| '-c:v', 'copy', # Copy video codec | |
| '-c:a', 'aac', # Encode audio to AAC for compatibility | |
| '-map', '0:v:0', # Map video from first input | |
| '-map', '1:a:0', # Map audio from second input | |
| '-shortest', # Finish encoding when shortest input ends | |
| '-y', # Overwrite output file | |
| output_path | |
| ], | |
| check=True, | |
| capture_output=True | |
| ) | |
| return output_path | |
| except subprocess.CalledProcessError as e: | |
| raise Exception(f"Error combining audio and video: {str(e)}") | |
| except FileNotFoundError: | |
| raise Exception("ffmpeg not found. Please install ffmpeg to use this function.") | |
| ytVideoDownload = FnNode( | |
| download_youtube_video, | |
| inputs={ | |
| "video_url": gr.Textbox(label="Video URL"), | |
| }, | |
| outputs={ | |
| "original_video": gr.Video(label=" Original Video"), | |
| }, | |
| ) | |
| audioConversion = FnNode( | |
| convert_mp4_to_mp3, | |
| inputs={ | |
| "original_video": ytVideoDownload.original_video, | |
| }, | |
| outputs={ | |
| "original_audio": gr.Audio(label="Original Audio"), | |
| }, | |
| ) | |
| audio_segmentation = GradioNode( | |
| "r3gm/Audio_separator", | |
| api_name="/sound_separate", | |
| inputs={ | |
| "media_file": audioConversion.original_audio, | |
| "stem": ['background'] | |
| }, | |
| outputs={ | |
| "commentary_removed": gr.Audio(label="Commentary Removed Audio"), | |
| }, | |
| ) | |
| replaceAudioVideo = FnNode( | |
| replace_audio_video, | |
| inputs={ | |
| "audio_path": audio_segmentation.commentary_removed, | |
| "video_path": ytVideoDownload.original_video, | |
| }, | |
| outputs={ | |
| "final_video": gr.Video(label="No Commentary Video"), | |
| }, | |
| ) | |
| graph = Graph( | |
| name="Remove Sports Commentary", | |
| nodes=[ytVideoDownload, audioConversion, audio_segmentation, replaceAudioVideo] | |
| ) | |
| graph.launch() | |