Spaces:
Sleeping
Sleeping
| import os | |
| import tempfile | |
| import gradio as gr | |
| import re | |
| import sys | |
| # Try to import required packages with error handling | |
| try: | |
| from yt_dlp import YoutubeDL | |
| YT_DLP_AVAILABLE = True | |
| except ImportError as e: | |
| YT_DLP_AVAILABLE = False | |
| print(f"yt-dlp import error: {e}") | |
| try: | |
| import whisper | |
| WHISPER_AVAILABLE = True | |
| except ImportError as e: | |
| WHISPER_AVAILABLE = False | |
| print(f"whisper import error: {e}") | |
| print(f"Python version: {sys.version}") | |
| print(f"yt-dlp available: {YT_DLP_AVAILABLE}") | |
| print(f"whisper available: {WHISPER_AVAILABLE}") | |
| def get_cookies_path(): | |
| """Get the path to cookies.txt file""" | |
| # Check if cookies.txt exists in the current directory | |
| if os.path.exists('cookies.txt'): | |
| return 'cookies.txt' | |
| # Check in the same directory as the script | |
| script_dir = os.path.dirname(os.path.abspath(__file__)) | |
| cookies_path = os.path.join(script_dir, 'cookies.txt') | |
| if os.path.exists(cookies_path): | |
| return cookies_path | |
| return None | |
| def download_audio(url): | |
| """Download audio from YouTube URL and return the file path""" | |
| if not YT_DLP_AVAILABLE: | |
| raise Exception("yt-dlp is not available. Please check the installation.") | |
| try: | |
| # Create a temporary directory for downloads | |
| temp_dir = tempfile.mkdtemp() | |
| output_path = os.path.join(temp_dir, "audio") | |
| # Get cookies path | |
| cookies_path = get_cookies_path() | |
| # Base yt-dlp options | |
| ydl_opts = { | |
| 'format': 'bestaudio[ext=m4a]/bestaudio/best', | |
| 'outtmpl': output_path + '.%(ext)s', | |
| 'quiet': True, | |
| 'no_warnings': True, | |
| 'extract_flat': False, | |
| 'ignoreerrors': False, | |
| # Add user agent to avoid bot detection | |
| 'http_headers': { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' | |
| }, | |
| # Add additional options to avoid bot detection | |
| 'extractor_retries': 3, | |
| 'fragment_retries': 3, | |
| 'retry_sleep_functions': {'http': lambda n: min(2 ** n, 30)}, | |
| # Add geo bypass options | |
| 'geo_bypass': True, | |
| 'geo_bypass_country': 'US', | |
| # Add more headers | |
| 'http_headers': { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', | |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', | |
| 'Accept-Language': 'en-us,en;q=0.5', | |
| 'Accept-Encoding': 'gzip,deflate', | |
| 'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', | |
| 'Connection': 'keep-alive', | |
| 'Upgrade-Insecure-Requests': '1', | |
| } | |
| } | |
| # Add cookies if available | |
| if cookies_path: | |
| ydl_opts['cookiefile'] = cookies_path | |
| print(f"Using cookies from: {cookies_path}") | |
| else: | |
| print("No cookies.txt found - proceeding without cookies") | |
| with YoutubeDL(ydl_opts) as ydl: | |
| try: | |
| # Extract info first to check if video is available | |
| info_dict = ydl.extract_info(url, download=False) | |
| # Check if video is available | |
| if info_dict.get('availability') == 'private': | |
| raise Exception("Video is private and cannot be accessed") | |
| elif info_dict.get('availability') == 'premium_only': | |
| raise Exception("Video requires premium subscription") | |
| elif info_dict.get('availability') == 'subscriber_only': | |
| raise Exception("Video is only available to channel subscribers") | |
| elif info_dict.get('availability') == 'needs_auth': | |
| raise Exception("Video requires authentication - try updating cookies") | |
| elif info_dict.get('live_status') == 'is_live': | |
| raise Exception("Cannot download live streams") | |
| elif info_dict.get('live_status') == 'was_live': | |
| print("Note: This was a live stream, trying to download recorded version...") | |
| # Download the audio | |
| ydl.download([url]) | |
| except Exception as extract_error: | |
| # If extract_info fails, try direct download as fallback | |
| print(f"Info extraction failed: {extract_error}") | |
| print("Attempting direct download...") | |
| ydl.download([url]) | |
| # Find the downloaded file | |
| for ext in ['.m4a', '.webm', '.mp4', '.mp3']: | |
| potential_file = output_path + ext | |
| if os.path.exists(potential_file): | |
| print(f"Successfully downloaded: {potential_file}") | |
| return potential_file | |
| raise FileNotFoundError(f"Downloaded audio file not found") | |
| except Exception as e: | |
| error_msg = str(e) | |
| if "Sign in to confirm your age" in error_msg: | |
| raise Exception("β Video is age-restricted. Please use a different video or update your cookies with an authenticated session.") | |
| elif "Private video" in error_msg: | |
| raise Exception("β Video is private and cannot be accessed.") | |
| elif "This video is unavailable" in error_msg or "Video unavailable" in error_msg: | |
| raise Exception("β Video is unavailable. This could be due to:\nβ’ Geographic restrictions\nβ’ Content removed by uploader\nβ’ Copyright issues\nβ’ Try a different video") | |
| elif "This content isn't available" in error_msg: | |
| raise Exception("β Content not available in your region or has been restricted. Try:\nβ’ Using a VPN\nβ’ Different video\nβ’ Updating cookies") | |
| elif "blocked" in error_msg.lower(): | |
| raise Exception("β Access blocked. Try using updated cookies or a different video.") | |
| elif "HTTP Error 403" in error_msg: | |
| raise Exception("β Access forbidden. Video may be region-locked or require authentication.") | |
| elif "HTTP Error 404" in error_msg: | |
| raise Exception("β Video not found. It may have been deleted or the URL is incorrect.") | |
| else: | |
| raise Exception(f"β Download failed: {error_msg}") | |
| def test_video_access(url): | |
| """Test if a video is accessible without downloading""" | |
| try: | |
| cookies_path = get_cookies_path() | |
| ydl_opts = { | |
| 'quiet': True, | |
| 'no_warnings': True, | |
| 'extract_flat': False, | |
| 'skip_download': True, | |
| 'extractor_args': {'youtubetab': 'skip=authcheck'} # β ADD THIS LINE | |
| } | |
| if cookies_path: | |
| ydl_opts['cookiefile'] = cookies_path | |
| with YoutubeDL(ydl_opts) as ydl: | |
| info_dict = ydl.extract_info(url, download=False) | |
| status = "β Video accessible" | |
| details = [] | |
| if info_dict.get('title'): | |
| details.append(f"Title: {info_dict['title'][:50]}...") | |
| if info_dict.get('duration'): | |
| details.append(f"Duration: {info_dict['duration']} seconds") | |
| if info_dict.get('availability'): | |
| details.append(f"Availability: {info_dict['availability']}") | |
| if info_dict.get('age_limit'): | |
| details.append(f"Age limit: {info_dict['age_limit']}+") | |
| return status + "\n" + "\n".join(details) | |
| except Exception as e: | |
| return f"β Video access test failed: {str(e)}" | |
| def transcribe_audio(file_path): | |
| """Transcribe audio file using Whisper""" | |
| if not WHISPER_AVAILABLE: | |
| raise Exception("OpenAI Whisper is not available. Please check the installation.") | |
| try: | |
| # Use the smallest model to reduce memory usage | |
| model = whisper.load_model("tiny") | |
| result = model.transcribe(file_path) | |
| return result["text"] | |
| except Exception as e: | |
| raise Exception(f"Failed to transcribe audio: {str(e)}") | |
| def extract_stock_info_simple(text): | |
| """Extract stock information using simple pattern matching""" | |
| try: | |
| stock_info = [] | |
| # Simple patterns to look for stock-related information | |
| stock_patterns = [ | |
| r'\b[A-Z]{1,5}\b(?:\s+stock|\s+shares|\s+symbol)', # Stock symbols | |
| r'(?:buy|sell|target|price)\s+[A-Z]{1,5}', | |
| r'\$\d+(?:\.\d{2})?', # Dollar amounts | |
| r'\b(?:bullish|bearish|buy|sell|hold)\b', | |
| ] | |
| # Look for company names and stock mentions | |
| companies = re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*(?:\s+(?:Inc|Corp|Company|Ltd)\.?)?', text) | |
| symbols = re.findall(r'\b[A-Z]{2,5}\b', text) | |
| prices = re.findall(r'\$\d+(?:\.\d{2})?', text) | |
| actions = re.findall(r'\b(?:buy|sell|hold|bullish|bearish|target|stop\s+loss)\b', text, re.IGNORECASE) | |
| # Format the extracted information | |
| result = "=== EXTRACTED STOCK INFORMATION ===\n\n" | |
| if companies: | |
| result += f"π Mentioned Companies: {', '.join(set(companies[:10]))}\n\n" | |
| if symbols: | |
| result += f"π€ Potential Stock Symbols: {', '.join(set(symbols[:10]))}\n\n" | |
| if prices: | |
| result += f"π° Price Mentions: {', '.join(set(prices[:10]))}\n\n" | |
| if actions: | |
| result += f"π Trading Actions: {', '.join(set(actions[:10]))}\n\n" | |
| # Look for specific recommendation patterns | |
| recommendations = [] | |
| sentences = text.split('.') | |
| for sentence in sentences: | |
| if any(word in sentence.lower() for word in ['buy', 'sell', 'target', 'recommendation']): | |
| if any(symbol in sentence for symbol in symbols[:5]): | |
| recommendations.append(sentence.strip()) | |
| if recommendations: | |
| result += "π― Potential Recommendations:\n" | |
| for rec in recommendations[:5]: | |
| result += f"β’ {rec}\n" | |
| if not any([companies, symbols, prices, actions]): | |
| result += "β οΈ No clear stock recommendations found in the transcript.\n" | |
| result += "This might be because:\n" | |
| result += "β’ The video doesn't contain stock recommendations\n" | |
| result += "β’ The audio quality was poor\n" | |
| result += "β’ The content is not in English\n" | |
| return result | |
| except Exception as e: | |
| return f"Error extracting stock info: {str(e)}" | |
| def cleanup_file(file_path): | |
| """Clean up temporary files""" | |
| try: | |
| if file_path and os.path.exists(file_path): | |
| os.remove(file_path) | |
| # Also try to remove the directory if it's empty | |
| try: | |
| os.rmdir(os.path.dirname(file_path)) | |
| except: | |
| pass | |
| except: | |
| pass | |
| def system_test(): | |
| """Test system components""" | |
| results = [] | |
| # Test yt-dlp | |
| if YT_DLP_AVAILABLE: | |
| results.append("β yt-dlp: Available") | |
| try: | |
| ydl = YoutubeDL({'quiet': True}) | |
| results.append("β yt-dlp: Can create YoutubeDL instance") | |
| except Exception as e: | |
| results.append(f"β yt-dlp: Cannot create instance - {e}") | |
| else: | |
| results.append("β yt-dlp: Not available") | |
| # Test Whisper | |
| if WHISPER_AVAILABLE: | |
| results.append("β Whisper: Available (Type: openai-whisper)") | |
| try: | |
| import whisper | |
| results.append("β Whisper: OpenAI Whisper can be imported") | |
| except Exception as e: | |
| results.append(f"β Whisper: Cannot import - {e}") | |
| else: | |
| results.append("β Whisper: Not available") | |
| # Test file operations | |
| try: | |
| temp_file = tempfile.NamedTemporaryFile(delete=False) | |
| temp_file.write(b"test") | |
| temp_file.close() | |
| os.remove(temp_file.name) | |
| results.append("β File operations: Working") | |
| except Exception as e: | |
| results.append(f"β File operations: Failed - {e}") | |
| # Test cookies | |
| cookies_path = get_cookies_path() | |
| if cookies_path: | |
| results.append(f"β Cookies: Found at {cookies_path}") | |
| else: | |
| results.append("β οΈ Cookies: Not found (may cause bot detection issues)") | |
| return "\n".join(results) | |
| def process_video(url, progress=gr.Progress()): | |
| """Main function to process YouTube video""" | |
| # Check if required packages are available | |
| if not YT_DLP_AVAILABLE: | |
| return "Error: yt-dlp is not installed properly. Please check the requirements.", "" | |
| if not WHISPER_AVAILABLE: | |
| return "Error: OpenAI Whisper is not installed properly. Please check the requirements.", "" | |
| if not url or not url.strip(): | |
| return "Please provide a valid YouTube URL", "" | |
| audio_path = None | |
| try: | |
| # Validate URL | |
| if not any(domain in url.lower() for domain in ['youtube.com', 'youtu.be']): | |
| return "Please provide a valid YouTube URL", "" | |
| # Download audio | |
| progress(0.1, desc="Downloading audio...") | |
| audio_path = download_audio(url) | |
| # Transcribe audio | |
| progress(0.5, desc="Transcribing audio...") | |
| transcript = transcribe_audio(audio_path) | |
| if not transcript.strip(): | |
| return "No speech detected in the video", "" | |
| # Extract stock information | |
| progress(0.8, desc="Extracting stock information...") | |
| stock_details = extract_stock_info_simple(transcript) | |
| progress(1.0, desc="Complete!") | |
| return transcript, stock_details | |
| except Exception as e: | |
| error_msg = f"Error processing video: {str(e)}" | |
| return error_msg, "" | |
| finally: | |
| # Clean up temporary files | |
| cleanup_file(audio_path) | |
| # Create Gradio interface | |
| with gr.Blocks( | |
| title="Stock Recommendation Extractor", | |
| theme=gr.themes.Soft(), | |
| css=""" | |
| .gradio-container { | |
| max-width: 1200px; | |
| margin: auto; | |
| } | |
| """ | |
| ) as demo: | |
| gr.Markdown(""" | |
| # π Stock Recommendation Extractor from YouTube | |
| Extract stock recommendations and trading information from YouTube videos using AI transcription. | |
| **How it works:** | |
| 1. Downloads audio from YouTube video | |
| 2. Transcribes using OpenAI Whisper | |
| 3. Extracts stock-related information | |
| **β οΈ Disclaimer:** This is for educational purposes only. Always do your own research! | |
| """) | |
| # Add system test section | |
| with gr.Accordion("π§ͺ System Status", open=False): | |
| system_status = gr.Textbox( | |
| value=system_test(), | |
| label="System Test Results", | |
| lines=10, | |
| interactive=False | |
| ) | |
| test_btn = gr.Button("π Re-run System Test") | |
| test_btn.click(fn=system_test, outputs=system_status) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| url_input = gr.Textbox( | |
| label="πΊ YouTube URL", | |
| placeholder="https://www.youtube.com/watch?v=...", | |
| lines=2 | |
| ) | |
| with gr.Row(): | |
| process_btn = gr.Button( | |
| "π Extract Stock Information", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| test_btn = gr.Button( | |
| "π Test Video Access", | |
| variant="secondary" | |
| ) | |
| test_result = gr.Textbox( | |
| label="π Video Access Test", | |
| lines=4, | |
| visible=False | |
| ) | |
| gr.Markdown(""" | |
| ### π‘ Tips: | |
| - **First try "Test Video Access"** to check if video is available | |
| - Works best with financial YouTube channels | |
| - Ensure video has clear audio | |
| - English content works best | |
| - If you get bot detection errors, try updating cookies.txt | |
| ### π― Recommended Financial Channels: | |
| - Ben Felix, The Plain Bagel, Two Cents, Graham Stephan | |
| - Make sure videos are public and not age-restricted | |
| """) | |
| # Add test button functionality | |
| def test_and_show(url): | |
| if not url: | |
| return "Please enter a YouTube URL first", gr.update(visible=False) | |
| result = test_video_access(url) | |
| return result, gr.update(visible=True) | |
| test_btn.click( | |
| fn=test_and_show, | |
| inputs=[url_input], | |
| outputs=[test_result, test_result] | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| transcript_output = gr.Textbox( | |
| label="π Full Transcript", | |
| lines=15, | |
| max_lines=20, | |
| show_copy_button=True | |
| ) | |
| with gr.Column(): | |
| stock_info_output = gr.Textbox( | |
| label="π Extracted Stock Information", | |
| lines=15, | |
| max_lines=20, | |
| show_copy_button=True | |
| ) | |
| # Event handlers | |
| process_btn.click( | |
| fn=process_video, | |
| inputs=[url_input], | |
| outputs=[transcript_output, stock_info_output], | |
| show_progress=True | |
| ) | |
| # Example section | |
| gr.Markdown("### π Example URLs (Replace with actual financial videos)") | |
| gr.Examples( | |
| examples=[ | |
| ["https://www.youtube.com/watch?v=dQw4w9WgXcQ"], | |
| ], | |
| inputs=[url_input], | |
| label="Click to try example" | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |