Spaces:
Sleeping
Sleeping
| import os | |
| import tempfile | |
| import whisper | |
| import subprocess | |
| from transformers import pipeline | |
| from concurrent.futures import ThreadPoolExecutor | |
| import re | |
| import json | |
| from hashlib import md5 | |
| import browser_cookie3 | |
| class VideoProcessor: | |
| def __init__(self): | |
| self.summarizer = pipeline("summarization", model="facebook/bart-large-cnn") | |
| self.models = {} | |
| self.cookie_file = "cookies.txt" # Path to your cookies file | |
| def load_model(self, model_size="base"): | |
| if model_size not in self.models: | |
| self.models[model_size] = whisper.load_model(model_size) | |
| return self.models[model_size] | |
| def _download_with_cookies(self, url): | |
| """Method 1: Download using browser cookies""" | |
| cmd = [ | |
| "yt-dlp", | |
| "--cookies", self.cookie_file, | |
| "--extract-audio", | |
| "--audio-format", "mp3", | |
| "--audio-quality", "0", | |
| "--quiet", | |
| "-o", os.path.join(tempfile.mkdtemp(), "audio.%(ext)s"), | |
| url | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode != 0: | |
| raise Exception(f"Cookie download failed: {result.stderr}") | |
| return self._find_downloaded_file() | |
| def _download_with_yt_dlp(self, url): | |
| """Method 2: Regular download""" | |
| cmd = [ | |
| "yt-dlp", | |
| "--extract-audio", | |
| "--audio-format", "mp3", | |
| "--quiet", | |
| "-o", os.path.join(tempfile.mkdtemp(), "audio.%(ext)s"), | |
| url | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode != 0: | |
| raise Exception(f"Download failed: {result.stderr}") | |
| return self._find_downloaded_file() | |
| def _find_downloaded_file(self): | |
| """Helper to find downloaded audio file""" | |
| for root, _, files in os.walk(tempfile.gettempdir()): | |
| for file in files: | |
| if file.endswith('.mp3'): | |
| return os.path.join(root, file) | |
| raise Exception("Downloaded audio file not found") | |
| def download_audio(self, url, use_cookies=False): | |
| """Robust download with fallback methods""" | |
| try: | |
| if use_cookies and os.path.exists(self.cookie_file): | |
| return self._download_with_cookies(url) | |
| return self._download_with_yt_dlp(url) | |
| except Exception as e: | |
| raise Exception(f"All download methods failed: {str(e)}") | |
| def transcribe_audio(self, audio_path, model_size="base"): | |
| model = self.load_model(model_size) | |
| result = model.transcribe(audio_path) | |
| return result["text"] | |
| def clean_transcript(self, text): | |
| text = re.sub(r'\b(um|uh|like|you know)\b', '', text, flags=re.IGNORECASE) | |
| return re.sub(r'\s+', ' ', text).strip() | |
| def summarize_chunk(self, chunk): | |
| return self.summarizer(chunk, max_length=150, min_length=30)[0]['summary_text'] | |
| def summarize_text(self, text, chunk_size=1000): | |
| text = self.clean_transcript(text) | |
| chunks = [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)] | |
| with ThreadPoolExecutor(max_workers=4) as executor: | |
| summaries = list(executor.map(self.summarize_chunk, chunks)) | |
| return "\n".join(summaries) | |
| def extract_key_points(self, text): | |
| prompt = f"""Extract 5-7 key points from this transcript. Each point should: | |
| - Start with a bullet (-) | |
| - Be concise but specific | |
| - Include numbers/dates when mentioned | |
| Transcript: | |
| {text[:8000]} | |
| Key Points:""" | |
| result = self.summarizer(prompt, max_length=300, min_length=100)[0]['summary_text'] | |
| return re.sub(r'(^|\n)(?=\w)', '\n- ', result) | |
| def get_video_id(self, url): | |
| return md5(url.encode()).hexdigest() | |
| def process(self, youtube_url, chunk_size=1000, model_size="base", use_cookies=False): | |
| video_id = self.get_video_id(youtube_url) | |
| cache_file = f"cache_{video_id}.json" | |
| if os.path.exists(cache_file): | |
| with open(cache_file) as f: | |
| return json.load(f) | |
| try: | |
| audio_path = self.download_audio(youtube_url, use_cookies) | |
| transcript = self.transcribe_audio(audio_path, model_size) | |
| result = { | |
| 'summary': self.summarize_text(transcript, chunk_size), | |
| 'key_points': self.extract_key_points(transcript), | |
| 'transcript': transcript[:2000] + ("..." if len(transcript) > 2000 else "") | |
| } | |
| with open(cache_file, 'w') as f: | |
| json.dump(result, f) | |
| return result | |
| except Exception as e: | |
| return {'error': str(e)} |