import os import tempfile import gradio as gr import re import sys # Try to import required packages with error handling try: from yt_dlp import YoutubeDL YT_DLP_AVAILABLE = True except ImportError as e: YT_DLP_AVAILABLE = False print(f"yt-dlp import error: {e}") try: import whisper WHISPER_AVAILABLE = True except ImportError as e: WHISPER_AVAILABLE = False print(f"whisper import error: {e}") print(f"Python version: {sys.version}") print(f"yt-dlp available: {YT_DLP_AVAILABLE}") print(f"whisper available: {WHISPER_AVAILABLE}") def get_cookies_path(): """Get the path to cookies.txt file""" # Check if cookies.txt exists in the current directory if os.path.exists('cookies.txt'): return 'cookies.txt' # Check in the same directory as the script script_dir = os.path.dirname(os.path.abspath(__file__)) cookies_path = os.path.join(script_dir, 'cookies.txt') if os.path.exists(cookies_path): return cookies_path return None def download_audio(url): """Download audio from YouTube URL and return the file path""" if not YT_DLP_AVAILABLE: raise Exception("yt-dlp is not available. Please check the installation.") try: # Create a temporary directory for downloads temp_dir = tempfile.mkdtemp() output_path = os.path.join(temp_dir, "audio") # Get cookies path cookies_path = get_cookies_path() # Base yt-dlp options ydl_opts = { 'format': 'bestaudio[ext=m4a]/bestaudio/best', 'outtmpl': output_path + '.%(ext)s', 'quiet': True, 'no_warnings': True, 'extract_flat': False, 'ignoreerrors': False, # Add user agent to avoid bot detection 'http_headers': { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' }, # Add additional options to avoid bot detection 'extractor_retries': 3, 'fragment_retries': 3, 'retry_sleep_functions': {'http': lambda n: min(2 ** n, 30)}, # Add geo bypass options 'geo_bypass': True, 'geo_bypass_country': 'US', # Add more headers 'http_headers': { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'en-us,en;q=0.5', 'Accept-Encoding': 'gzip,deflate', 'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', } } # Add cookies if available if cookies_path: ydl_opts['cookiefile'] = cookies_path print(f"Using cookies from: {cookies_path}") else: print("No cookies.txt found - proceeding without cookies") with YoutubeDL(ydl_opts) as ydl: try: # Extract info first to check if video is available info_dict = ydl.extract_info(url, download=False) # Check if video is available if info_dict.get('availability') == 'private': raise Exception("Video is private and cannot be accessed") elif info_dict.get('availability') == 'premium_only': raise Exception("Video requires premium subscription") elif info_dict.get('availability') == 'subscriber_only': raise Exception("Video is only available to channel subscribers") elif info_dict.get('availability') == 'needs_auth': raise Exception("Video requires authentication - try updating cookies") elif info_dict.get('live_status') == 'is_live': raise Exception("Cannot download live streams") elif info_dict.get('live_status') == 'was_live': print("Note: This was a live stream, trying to download recorded version...") # Download the audio ydl.download([url]) except Exception as extract_error: # If extract_info fails, try direct download as fallback print(f"Info extraction failed: {extract_error}") print("Attempting direct download...") ydl.download([url]) # Find the downloaded file for ext in ['.m4a', '.webm', '.mp4', '.mp3']: potential_file = output_path + ext if os.path.exists(potential_file): print(f"Successfully downloaded: {potential_file}") return potential_file raise FileNotFoundError(f"Downloaded audio file not found") except Exception as e: error_msg = str(e) if "Sign in to confirm your age" in error_msg: raise Exception("❌ Video is age-restricted. Please use a different video or update your cookies with an authenticated session.") elif "Private video" in error_msg: raise Exception("❌ Video is private and cannot be accessed.") elif "This video is unavailable" in error_msg or "Video unavailable" in error_msg: raise Exception("❌ Video is unavailable. This could be due to:\n• Geographic restrictions\n• Content removed by uploader\n• Copyright issues\n• Try a different video") elif "This content isn't available" in error_msg: raise Exception("❌ Content not available in your region or has been restricted. Try:\n• Using a VPN\n• Different video\n• Updating cookies") elif "blocked" in error_msg.lower(): raise Exception("❌ Access blocked. Try using updated cookies or a different video.") elif "HTTP Error 403" in error_msg: raise Exception("❌ Access forbidden. Video may be region-locked or require authentication.") elif "HTTP Error 404" in error_msg: raise Exception("❌ Video not found. It may have been deleted or the URL is incorrect.") else: raise Exception(f"❌ Download failed: {error_msg}") def test_video_access(url): """Test if a video is accessible without downloading""" try: cookies_path = get_cookies_path() ydl_opts = { 'quiet': True, 'no_warnings': True, 'extract_flat': False, 'skip_download': True, 'extractor_args': {'youtubetab': 'skip=authcheck'} # ✅ ADD THIS LINE } if cookies_path: ydl_opts['cookiefile'] = cookies_path with YoutubeDL(ydl_opts) as ydl: info_dict = ydl.extract_info(url, download=False) status = "✅ Video accessible" details = [] if info_dict.get('title'): details.append(f"Title: {info_dict['title'][:50]}...") if info_dict.get('duration'): details.append(f"Duration: {info_dict['duration']} seconds") if info_dict.get('availability'): details.append(f"Availability: {info_dict['availability']}") if info_dict.get('age_limit'): details.append(f"Age limit: {info_dict['age_limit']}+") return status + "\n" + "\n".join(details) except Exception as e: return f"❌ Video access test failed: {str(e)}" def transcribe_audio(file_path): """Transcribe audio file using Whisper""" if not WHISPER_AVAILABLE: raise Exception("OpenAI Whisper is not available. Please check the installation.") try: # Use the smallest model to reduce memory usage model = whisper.load_model("tiny") result = model.transcribe(file_path) return result["text"] except Exception as e: raise Exception(f"Failed to transcribe audio: {str(e)}") def extract_stock_info_simple(text): """Extract stock information using simple pattern matching""" try: stock_info = [] # Simple patterns to look for stock-related information stock_patterns = [ r'\b[A-Z]{1,5}\b(?:\s+stock|\s+shares|\s+symbol)', # Stock symbols r'(?:buy|sell|target|price)\s+[A-Z]{1,5}', r'\$\d+(?:\.\d{2})?', # Dollar amounts r'\b(?:bullish|bearish|buy|sell|hold)\b', ] # Look for company names and stock mentions companies = re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*(?:\s+(?:Inc|Corp|Company|Ltd)\.?)?', text) symbols = re.findall(r'\b[A-Z]{2,5}\b', text) prices = re.findall(r'\$\d+(?:\.\d{2})?', text) actions = re.findall(r'\b(?:buy|sell|hold|bullish|bearish|target|stop\s+loss)\b', text, re.IGNORECASE) # Format the extracted information result = "=== EXTRACTED STOCK INFORMATION ===\n\n" if companies: result += f"📊 Mentioned Companies: {', '.join(set(companies[:10]))}\n\n" if symbols: result += f"🔤 Potential Stock Symbols: {', '.join(set(symbols[:10]))}\n\n" if prices: result += f"💰 Price Mentions: {', '.join(set(prices[:10]))}\n\n" if actions: result += f"📈 Trading Actions: {', '.join(set(actions[:10]))}\n\n" # Look for specific recommendation patterns recommendations = [] sentences = text.split('.') for sentence in sentences: if any(word in sentence.lower() for word in ['buy', 'sell', 'target', 'recommendation']): if any(symbol in sentence for symbol in symbols[:5]): recommendations.append(sentence.strip()) if recommendations: result += "🎯 Potential Recommendations:\n" for rec in recommendations[:5]: result += f"• {rec}\n" if not any([companies, symbols, prices, actions]): result += "⚠️ No clear stock recommendations found in the transcript.\n" result += "This might be because:\n" result += "• The video doesn't contain stock recommendations\n" result += "• The audio quality was poor\n" result += "• The content is not in English\n" return result except Exception as e: return f"Error extracting stock info: {str(e)}" def cleanup_file(file_path): """Clean up temporary files""" try: if file_path and os.path.exists(file_path): os.remove(file_path) # Also try to remove the directory if it's empty try: os.rmdir(os.path.dirname(file_path)) except: pass except: pass def system_test(): """Test system components""" results = [] # Test yt-dlp if YT_DLP_AVAILABLE: results.append("✅ yt-dlp: Available") try: ydl = YoutubeDL({'quiet': True}) results.append("✅ yt-dlp: Can create YoutubeDL instance") except Exception as e: results.append(f"❌ yt-dlp: Cannot create instance - {e}") else: results.append("❌ yt-dlp: Not available") # Test Whisper if WHISPER_AVAILABLE: results.append("✅ Whisper: Available (Type: openai-whisper)") try: import whisper results.append("✅ Whisper: OpenAI Whisper can be imported") except Exception as e: results.append(f"❌ Whisper: Cannot import - {e}") else: results.append("❌ Whisper: Not available") # Test file operations try: temp_file = tempfile.NamedTemporaryFile(delete=False) temp_file.write(b"test") temp_file.close() os.remove(temp_file.name) results.append("✅ File operations: Working") except Exception as e: results.append(f"❌ File operations: Failed - {e}") # Test cookies cookies_path = get_cookies_path() if cookies_path: results.append(f"✅ Cookies: Found at {cookies_path}") else: results.append("⚠️ Cookies: Not found (may cause bot detection issues)") return "\n".join(results) def process_video(url, progress=gr.Progress()): """Main function to process YouTube video""" # Check if required packages are available if not YT_DLP_AVAILABLE: return "Error: yt-dlp is not installed properly. Please check the requirements.", "" if not WHISPER_AVAILABLE: return "Error: OpenAI Whisper is not installed properly. Please check the requirements.", "" if not url or not url.strip(): return "Please provide a valid YouTube URL", "" audio_path = None try: # Validate URL if not any(domain in url.lower() for domain in ['youtube.com', 'youtu.be']): return "Please provide a valid YouTube URL", "" # Download audio progress(0.1, desc="Downloading audio...") audio_path = download_audio(url) # Transcribe audio progress(0.5, desc="Transcribing audio...") transcript = transcribe_audio(audio_path) if not transcript.strip(): return "No speech detected in the video", "" # Extract stock information progress(0.8, desc="Extracting stock information...") stock_details = extract_stock_info_simple(transcript) progress(1.0, desc="Complete!") return transcript, stock_details except Exception as e: error_msg = f"Error processing video: {str(e)}" return error_msg, "" finally: # Clean up temporary files cleanup_file(audio_path) # Create Gradio interface with gr.Blocks( title="Stock Recommendation Extractor", theme=gr.themes.Soft(), css=""" .gradio-container { max-width: 1200px; margin: auto; } """ ) as demo: gr.Markdown(""" # 📈 Stock Recommendation Extractor from YouTube Extract stock recommendations and trading information from YouTube videos using AI transcription. **How it works:** 1. Downloads audio from YouTube video 2. Transcribes using OpenAI Whisper 3. Extracts stock-related information **⚠️ Disclaimer:** This is for educational purposes only. Always do your own research! """) # Add system test section with gr.Accordion("🧪 System Status", open=False): system_status = gr.Textbox( value=system_test(), label="System Test Results", lines=10, interactive=False ) test_btn = gr.Button("🔄 Re-run System Test") test_btn.click(fn=system_test, outputs=system_status) with gr.Row(): with gr.Column(scale=1): url_input = gr.Textbox( label="📺 YouTube URL", placeholder="https://www.youtube.com/watch?v=...", lines=2 ) with gr.Row(): process_btn = gr.Button( "🚀 Extract Stock Information", variant="primary", size="lg" ) test_btn = gr.Button( "🔍 Test Video Access", variant="secondary" ) test_result = gr.Textbox( label="📋 Video Access Test", lines=4, visible=False ) gr.Markdown(""" ### 💡 Tips: - **First try "Test Video Access"** to check if video is available - Works best with financial YouTube channels - Ensure video has clear audio - English content works best - If you get bot detection errors, try updating cookies.txt ### 🎯 Recommended Financial Channels: - Ben Felix, The Plain Bagel, Two Cents, Graham Stephan - Make sure videos are public and not age-restricted """) # Add test button functionality def test_and_show(url): if not url: return "Please enter a YouTube URL first", gr.update(visible=False) result = test_video_access(url) return result, gr.update(visible=True) test_btn.click( fn=test_and_show, inputs=[url_input], outputs=[test_result, test_result] ) with gr.Row(): with gr.Column(): transcript_output = gr.Textbox( label="📝 Full Transcript", lines=15, max_lines=20, show_copy_button=True ) with gr.Column(): stock_info_output = gr.Textbox( label="📊 Extracted Stock Information", lines=15, max_lines=20, show_copy_button=True ) # Event handlers process_btn.click( fn=process_video, inputs=[url_input], outputs=[transcript_output, stock_info_output], show_progress=True ) # Example section gr.Markdown("### 📋 Example URLs (Replace with actual financial videos)") gr.Examples( examples=[ ["https://www.youtube.com/watch?v=dQw4w9WgXcQ"], ], inputs=[url_input], label="Click to try example" ) if __name__ == "__main__": demo.launch()