Spaces:
Sleeping
Sleeping
| import os | |
| import sys | |
| import tempfile | |
| import warnings | |
| import time | |
| import shutil | |
| import requests | |
| from urllib.parse import urlparse, unquote | |
| from pathlib import Path | |
| import torch | |
| import torchaudio | |
| import yt_dlp | |
| from contextlib import contextmanager | |
| warnings.filterwarnings("ignore") | |
| os.environ['HF_HUB_DISABLE_SYMLINKS_WARNING'] = '1' | |
| def suppress_stdout_stderr(): | |
| with open(os.devnull, "w") as devnull: | |
| old_stdout = sys.stdout | |
| old_stderr = sys.stderr | |
| sys.stdout = devnull | |
| sys.stderr = devnull | |
| try: | |
| yield | |
| finally: | |
| sys.stdout = old_stdout | |
| sys.stderr = old_stderr | |
| class SimpleAudioExtractor: | |
| def __init__(self): | |
| self.supported_video_formats = ['.mp4', '.webm', '.avi', '.mov', '.mkv', '.m4v'] | |
| self.supported_audio_formats = ['.mp3', '.wav', '.m4a', '.aac', '.ogg', '.flac'] | |
| self.user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' | |
| def extract_audio_from_source(self, source): | |
| """Extract audio from file path, direct media URL, or Loom URL""" | |
| start_time = time.time() | |
| # Check if source is a file path | |
| if self._is_file_path(source): | |
| print(f"π Processing uploaded file: {source}") | |
| return self._process_local_file(source, start_time) | |
| # Check if source is a direct media URL | |
| if self._is_direct_media_url(source): | |
| print(f"π Processing direct media URL: {source}") | |
| return self._download_direct_media(source, start_time) | |
| # Check if source is a Loom URL | |
| if self._is_loom_url(source): | |
| print(f"π₯ Processing Loom URL: {source}") | |
| return self._extract_from_loom(source, start_time) | |
| raise Exception("Unsupported URL format. Please use Loom URLs or direct media links.") | |
| def _is_file_path(self, source): | |
| """Check if source is a local file path""" | |
| try: | |
| path = Path(source) | |
| return path.exists() and path.is_file() | |
| except: | |
| return False | |
| def _is_direct_media_url(self, url): | |
| """Check if URL points directly to a media file""" | |
| try: | |
| parsed = urlparse(url.lower()) | |
| path = unquote(parsed.path) | |
| return any(path.endswith(ext) for ext in self.supported_video_formats + self.supported_audio_formats) | |
| except: | |
| return False | |
| def _is_loom_url(self, url): | |
| """Check if URL is a Loom video""" | |
| return 'loom.com' in url.lower() | |
| def _process_local_file(self, file_path, start_time): | |
| """Process a local file (uploaded file)""" | |
| try: | |
| file_ext = Path(file_path).suffix.lower() | |
| # If it's already an audio file, convert to WAV if needed | |
| if file_ext in self.supported_audio_formats: | |
| if file_ext == '.wav': | |
| end_time = time.time() | |
| print(f"[β±οΈ] Audio file processing took {end_time - start_time:.2f} seconds.") | |
| return file_path | |
| else: | |
| return self._convert_to_wav(file_path, start_time) | |
| # If it's a video file, extract audio | |
| elif file_ext in self.supported_video_formats: | |
| return self._extract_audio_from_video_file(file_path, start_time) | |
| else: | |
| raise Exception(f"Unsupported file format: {file_ext}") | |
| except Exception as e: | |
| raise Exception(f"Failed to process local file: {str(e)}") | |
| def _download_direct_media(self, url, start_time): | |
| """Download direct media URL""" | |
| temp_dir = tempfile.mkdtemp() | |
| try: | |
| headers = { | |
| 'User-Agent': self.user_agent, | |
| 'Accept': '*/*', | |
| 'Accept-Language': 'en-US,en;q=0.9', | |
| 'Connection': 'keep-alive', | |
| } | |
| response = requests.get(url, headers=headers, stream=True, timeout=60) | |
| response.raise_for_status() | |
| # Determine file extension from URL or content type | |
| parsed_url = urlparse(url) | |
| url_ext = Path(parsed_url.path).suffix.lower() | |
| if url_ext in self.supported_video_formats + self.supported_audio_formats: | |
| ext = url_ext | |
| else: | |
| # Try to get from content type | |
| content_type = response.headers.get('content-type', '').lower() | |
| if 'video' in content_type: | |
| ext = '.mp4' | |
| elif 'audio' in content_type: | |
| ext = '.mp3' | |
| else: | |
| ext = '.mp4' # default | |
| downloaded_file = os.path.join(temp_dir, f'downloaded{ext}') | |
| with open(downloaded_file, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| if chunk: | |
| f.write(chunk) | |
| print(f"β Downloaded {os.path.getsize(downloaded_file) / 1024 / 1024:.1f}MB") | |
| # Process the downloaded file | |
| if ext in self.supported_audio_formats: | |
| if ext == '.wav': | |
| end_time = time.time() | |
| print(f"[β±οΈ] Direct audio download took {end_time - start_time:.2f} seconds.") | |
| return downloaded_file | |
| else: | |
| return self._convert_to_wav(downloaded_file, start_time) | |
| else: | |
| return self._extract_audio_from_video_file(downloaded_file, start_time) | |
| except Exception as e: | |
| if os.path.exists(temp_dir): | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| raise Exception(f"Failed to download direct media: {str(e)}") | |
| def _extract_from_loom(self, url, start_time): | |
| """Extract audio from Loom URL using yt-dlp""" | |
| temp_dir = tempfile.mkdtemp() | |
| try: | |
| ydl_opts = { | |
| 'format': 'bestaudio/best', | |
| 'postprocessors': [{ | |
| 'key': 'FFmpegExtractAudio', | |
| 'preferredcodec': 'wav', | |
| 'preferredquality': '192', | |
| }], | |
| 'outtmpl': os.path.join(temp_dir, 'loom_audio.%(ext)s'), | |
| 'quiet': True, | |
| 'no_warnings': True, | |
| 'noplaylist': True, | |
| 'http_headers': { | |
| 'User-Agent': self.user_agent, | |
| }, | |
| } | |
| with suppress_stdout_stderr(): | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| ydl.download([url]) | |
| # Find the extracted audio file | |
| for file in os.listdir(temp_dir): | |
| if file.endswith('.wav'): | |
| audio_path = os.path.join(temp_dir, file) | |
| end_time = time.time() | |
| print(f"[β±οΈ] Loom audio extraction took {end_time - start_time:.2f} seconds.") | |
| return audio_path | |
| raise Exception("Audio file not found after Loom extraction") | |
| except Exception as e: | |
| if os.path.exists(temp_dir): | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| raise Exception(f"Failed to extract from Loom: {str(e)}") | |
| def _extract_audio_from_video_file(self, video_file, start_time): | |
| """Extract audio from video file using FFmpeg or torchaudio""" | |
| temp_dir = tempfile.mkdtemp() | |
| output_audio = os.path.join(temp_dir, 'extracted_audio.wav') | |
| try: | |
| # Try FFmpeg first | |
| import subprocess | |
| cmd = [ | |
| 'ffmpeg', '-i', video_file, | |
| '-vn', # no video | |
| '-acodec', 'pcm_s16le', # uncompressed WAV | |
| '-ar', '16000', # 16kHz sample rate | |
| '-ac', '1', # mono | |
| '-y', # overwrite output file | |
| output_audio | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) | |
| if result.returncode == 0 and os.path.exists(output_audio): | |
| end_time = time.time() | |
| print(f"[β±οΈ] Audio extraction from video took {end_time - start_time:.2f} seconds.") | |
| return output_audio | |
| else: | |
| raise Exception("FFmpeg failed, trying torchaudio...") | |
| except (FileNotFoundError, Exception): | |
| # Fallback to torchaudio | |
| return self._convert_to_wav(video_file, start_time) | |
| def _convert_to_wav(self, audio_file, start_time): | |
| """Convert audio file to WAV format using torchaudio""" | |
| try: | |
| waveform, sample_rate = torchaudio.load(audio_file) | |
| # Convert to mono if needed | |
| if waveform.shape[0] > 1: | |
| waveform = torch.mean(waveform, dim=0, keepdim=True) | |
| # Resample to 16kHz if needed | |
| if sample_rate != 16000: | |
| waveform = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)(waveform) | |
| # Save as WAV | |
| temp_dir = tempfile.mkdtemp() | |
| output_wav = os.path.join(temp_dir, 'converted_audio.wav') | |
| torchaudio.save(output_wav, waveform, 16000) | |
| end_time = time.time() | |
| print(f"[β±οΈ] Audio conversion took {end_time - start_time:.2f} seconds.") | |
| return output_wav | |
| except Exception as e: | |
| raise Exception(f"Failed to convert audio to WAV: {str(e)}") | |
| def chunk_audio_1min(waveform, sample_rate, short_audio_threshold=30): | |
| """Create 1-minute chunks from audio, handle short audio as single chunk""" | |
| total_samples = waveform.size(1) | |
| duration_sec = total_samples / sample_rate | |
| # If audio is short (β€30 seconds by default), return as single chunk | |
| if duration_sec <= short_audio_threshold: | |
| print(f"π¦ Short audio ({duration_sec:.2f}s), keeping as single chunk") | |
| return [waveform] | |
| # For longer audio, use 1-minute chunks | |
| chunk_length_sec = 60 # 1 minute chunks | |
| chunk_samples = chunk_length_sec * sample_rate | |
| chunks = [] | |
| for start in range(0, total_samples, chunk_samples): | |
| end = min(start + chunk_samples, total_samples) | |
| chunk = waveform[:, start:end] | |
| # Only include chunks that are at least 10 seconds long | |
| if chunk.size(1) > sample_rate * 10: | |
| chunks.append(chunk) | |
| print(f"π¦ Created {len(chunks)} 1-minute chunks") | |
| return chunks | |
| def prepare_audio(video_source, short_audio_threshold=30): | |
| """Main function to extract and prepare audio chunks, handling short audio as single segment""" | |
| try: | |
| print(f"π΅ Extracting audio from source...") | |
| extractor = SimpleAudioExtractor() | |
| audio_path = extractor.extract_audio_from_source(video_source) | |
| print(f"β Audio extracted to: {audio_path}") | |
| print(f"π― Loading and preparing audio...") | |
| start = time.time() | |
| waveform, sample_rate = torchaudio.load(audio_path) | |
| # Resample to 16kHz if needed | |
| if sample_rate != 16000: | |
| waveform = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)(waveform) | |
| sample_rate = 16000 | |
| # Convert to mono if needed | |
| if waveform.shape[0] > 1: | |
| waveform = torch.mean(waveform, dim=0, keepdim=True) | |
| end = time.time() | |
| print(f"[β±οΈ] Audio preparation took {end - start:.2f} seconds.") | |
| # Calculate duration and create chunks | |
| duration_minutes = waveform.size(1) / sample_rate / 60 | |
| print(f"π§© Creating chunks (short audio threshold: {short_audio_threshold}s)...") | |
| start = time.time() | |
| chunks = chunk_audio_1min(waveform, sample_rate, short_audio_threshold) | |
| end = time.time() | |
| print(f"[β±οΈ] Chunking took {end - start:.2f} seconds. Total chunks: {len(chunks)}") | |
| return { | |
| "success": True, | |
| "chunks": chunks, | |
| "audio_path": audio_path, | |
| "duration_minutes": duration_minutes, | |
| "total_chunks": len(chunks) | |
| } | |
| except Exception as e: | |
| print(f"β Error in audio preparation.: {str(e)}") | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "chunks": [], | |
| "audio_path": None | |
| } |