import random import os import tempfile import subprocess import gradio as gr from daggr import GradioNode, Graph, FnNode from daggr.state import get_daggr_files_dir import yt_dlp def download_youtube_video(video_url: str) -> str: """ Downloads a YouTube video and returns the path to the MP4 file. Args: video_url: The YouTube video URL Returns: Path to the downloaded MP4 file """ output_path = os.path.join(get_daggr_files_dir(), "%(title)s.%(ext)s") ydl_opts = { 'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best', 'outtmpl': output_path, 'merge_output_format': 'mp4', 'quiet': True, 'no_warnings': True, } try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(video_url, download=True) filename = ydl.prepare_filename(info) if not filename.endswith('.mp4'): base_name = os.path.splitext(filename)[0] for ext in ['.mp4', '.mkv', '.webm']: potential_file = base_name + ext if os.path.exists(potential_file): if ext != '.mp4': mp4_file = base_name + '.mp4' subprocess.run(['ffmpeg', '-i', potential_file, '-c', 'copy', mp4_file, '-y'], check=True, capture_output=True) os.remove(potential_file) return mp4_file return potential_file return filename except Exception as e: raise Exception(f"Error downloading video: {str(e)}") def convert_mp4_to_mp3(original_video: str) -> str: """ Converts an MP4 video file to MP3 audio file. Args: video_path: Path to the MP4 video file Returns: Path to the converted MP3 audio file """ if not os.path.exists(original_video): raise FileNotFoundError(f"Video file not found: {original_video}") base_name = os.path.splitext(original_video)[0] mp3_path = base_name + '.mp3' try: subprocess.run( [ 'ffmpeg', '-i', original_video, '-vn', '-acodec', 'libmp3lame', '-ab', '192k', '-ar', '44100', '-y', mp3_path ], check=True, capture_output=True ) return mp3_path except subprocess.CalledProcessError as e: raise Exception(f"Error converting video to MP3: {str(e)}") except FileNotFoundError: raise Exception("ffmpeg not found. Please install ffmpeg to use this function.") def replace_audio_video(audio_path: str, video_path: str) -> str: """ Combines an audio file and a video file into a single video file. Args: audio_path: Path to the audio file video_path: Path to the video file Returns: Path to the combined video file """ if not os.path.exists(audio_path): raise FileNotFoundError(f"Audio file not found: {audio_path}") if not os.path.exists(video_path): raise FileNotFoundError(f"Video file not found: {video_path}") # Generate output filename based on video filename base_name = os.path.splitext(os.path.basename(video_path))[0] output_filename = f"{base_name}_with_new_audio.mp4" output_path = os.path.join(get_daggr_files_dir(), output_filename) try: # Map video stream from video file, audio stream from audio file # Use copy for video codec, encode audio to aac for compatibility subprocess.run( [ 'ffmpeg', '-i', video_path, '-i', audio_path, '-c:v', 'copy', # Copy video codec '-c:a', 'aac', # Encode audio to AAC for compatibility '-map', '0:v:0', # Map video from first input '-map', '1:a:0', # Map audio from second input '-shortest', # Finish encoding when shortest input ends '-y', # Overwrite output file output_path ], check=True, capture_output=True ) return output_path except subprocess.CalledProcessError as e: raise Exception(f"Error combining audio and video: {str(e)}") except FileNotFoundError: raise Exception("ffmpeg not found. Please install ffmpeg to use this function.") ytVideoDownload = FnNode( download_youtube_video, inputs={ "video_url": gr.Textbox(label="Video URL"), }, outputs={ "original_video": gr.Video(label=" Original Video"), }, ) audioConversion = FnNode( convert_mp4_to_mp3, inputs={ "original_video": ytVideoDownload.original_video, }, outputs={ "original_audio": gr.Audio(label="Original Audio"), }, ) audio_segmentation = GradioNode( "r3gm/Audio_separator", api_name="/sound_separate", inputs={ "media_file": audioConversion.original_audio, "stem": ['background'] }, outputs={ "commentary_removed": gr.Audio(label="Commentary Removed Audio"), }, ) replaceAudioVideo = FnNode( replace_audio_video, inputs={ "audio_path": audio_segmentation.commentary_removed, "video_path": ytVideoDownload.original_video, }, outputs={ "final_video": gr.Video(label="No Commentary Video"), }, ) graph = Graph( name="Remove Sports Commentary", nodes=[ytVideoDownload, audioConversion, audio_segmentation, replaceAudioVideo] ) graph.launch()