| """ |
| Professional Finance News Monitor using Playwright |
| Real-time Twitter/X scraping without authentication |
| Optimized for low-latency trading decisions |
| """ |
|
|
| import pandas as pd |
| from datetime import datetime, timedelta |
| from typing import List, Dict, Optional |
| import streamlit as st |
| import re |
| import logging |
| from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| try: |
| from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError |
| PLAYWRIGHT_AVAILABLE = True |
| except ImportError: |
| PLAYWRIGHT_AVAILABLE = False |
| logger.warning("playwright not available. Install with: pip install playwright && playwright install chromium") |
|
|
|
|
| class TwitterFinanceMonitor: |
| """ |
| Professional-grade financial news aggregator using Playwright |
| No authentication required - public Twitter/X profiles only |
| """ |
|
|
| |
| SOURCES = { |
| |
| 'walter_bloomberg': { |
| 'handle': 'WalterBloomberg', |
| 'url': 'https://x.com/WalterBloomberg', |
| 'weight': 1.9, |
| 'specialization': ['macro', 'markets', 'geopolitical'] |
| }, |
| 'fxhedge': { |
| 'handle': 'Fxhedgers', |
| 'url': 'https://x.com/Fxhedgers', |
| 'weight': 1.7, |
| 'specialization': ['macro', 'markets'] |
| }, |
| 'deitaone': { |
| 'handle': 'DeItaone', |
| 'url': 'https://x.com/DeItaone', |
| 'weight': 1.8, |
| 'specialization': ['markets', 'macro'] |
| }, |
| 'firstsquawk': { |
| 'handle': 'FirstSquawk', |
| 'url': 'https://x.com/FirstSquawk', |
| 'weight': 1.7, |
| 'specialization': ['markets', 'macro'] |
| }, |
| 'livesquawk': { |
| 'handle': 'LiveSquawk', |
| 'url': 'https://x.com/LiveSquawk', |
| 'weight': 1.7, |
| 'specialization': ['markets', 'macro'] |
| }, |
|
|
| |
| 'reuters': { |
| 'handle': 'Reuters', |
| 'url': 'https://x.com/Reuters', |
| 'weight': 1.9, |
| 'specialization': ['geopolitical', 'macro', 'markets'] |
| }, |
| 'bloomberg': { |
| 'handle': 'business', |
| 'url': 'https://x.com/business', |
| 'weight': 1.9, |
| 'specialization': ['markets', 'macro'] |
| }, |
| 'ft': { |
| 'handle': 'FT', |
| 'url': 'https://x.com/FT', |
| 'weight': 1.8, |
| 'specialization': ['markets', 'macro', 'geopolitical'] |
| }, |
| 'wsj': { |
| 'handle': 'WSJ', |
| 'url': 'https://x.com/WSJ', |
| 'weight': 1.8, |
| 'specialization': ['markets', 'macro', 'geopolitical'] |
| }, |
| 'cnbc': { |
| 'handle': 'CNBC', |
| 'url': 'https://x.com/CNBC', |
| 'weight': 1.6, |
| 'specialization': ['markets', 'macro'] |
| }, |
| 'bbcbusiness': { |
| 'handle': 'BBCBusiness', |
| 'url': 'https://x.com/BBCBusiness', |
| 'weight': 1.7, |
| 'specialization': ['geopolitical', 'macro', 'markets'] |
| }, |
|
|
| |
| 'zerohedge': { |
| 'handle': 'zerohedge', |
| 'url': 'https://x.com/zerohedge', |
| 'weight': 1.5, |
| 'specialization': ['macro', 'geopolitical', 'markets'] |
| }, |
| 'marketwatch': { |
| 'handle': 'MarketWatch', |
| 'url': 'https://x.com/MarketWatch', |
| 'weight': 1.6, |
| 'specialization': ['markets', 'macro'] |
| }, |
| 'unusual_whales': { |
| 'handle': 'unusual_whales', |
| 'url': 'https://x.com/unusual_whales', |
| 'weight': 1.5, |
| 'specialization': ['markets'] |
| }, |
| 'financialtimes': { |
| 'handle': 'FinancialTimes', |
| 'url': 'https://x.com/FinancialTimes', |
| 'weight': 1.8, |
| 'specialization': ['markets', 'macro', 'geopolitical'] |
| }, |
|
|
| |
| 'economics': { |
| 'handle': 'economics', |
| 'url': 'https://x.com/economics', |
| 'weight': 1.7, |
| 'specialization': ['macro', 'geopolitical'] |
| }, |
| 'ap': { |
| 'handle': 'AP', |
| 'url': 'https://x.com/AP', |
| 'weight': 1.7, |
| 'specialization': ['geopolitical', 'macro'] |
| }, |
| 'afp': { |
| 'handle': 'AFP', |
| 'url': 'https://x.com/AFP', |
| 'weight': 1.7, |
| 'specialization': ['geopolitical', 'macro'] |
| }, |
| 'ajenglish': { |
| 'handle': 'AJEnglish', |
| 'url': 'https://x.com/AJEnglish', |
| 'weight': 1.6, |
| 'specialization': ['geopolitical', 'macro'] |
| } |
| } |
|
|
| |
| MACRO_KEYWORDS = [ |
| 'Fed', 'ECB', 'BoE', 'BoJ', 'FOMC', 'Powell', 'Lagarde', |
| 'interest rate', 'inflation', 'CPI', 'PPI', 'GDP', |
| 'unemployment', 'jobs report', 'NFP', 'central bank', |
| 'monetary policy', 'quantitative', 'recession' |
| ] |
|
|
| MARKET_KEYWORDS = [ |
| 'S&P', 'Dow', 'Nasdaq', 'Russell', 'stocks', 'equities', |
| 'earnings', 'revenue', 'profit', 'shares', 'IPO', |
| 'merger', 'acquisition', 'crypto', 'Bitcoin', 'Ethereum', |
| 'oil', 'gold', 'commodities', 'futures', 'options' |
| ] |
|
|
| GEOPOLITICAL_KEYWORDS = [ |
| 'war', 'conflict', 'sanctions', 'trade', 'tariff', |
| 'China', 'Russia', 'Ukraine', 'Taiwan', 'Middle East', |
| 'election', 'government', 'military', 'diplomatic', |
| 'treaty', 'EU', 'Brexit', 'OPEC' |
| ] |
|
|
| def __init__(self): |
| """Initialize monitor""" |
| |
| self.chromium_path = self._find_chromium() |
|
|
| def _find_chromium(self) -> str: |
| """Find Chromium installation path""" |
| import os |
| import shutil |
|
|
| |
| paths = [ |
| '/usr/bin/chromium', |
| '/usr/bin/chromium-browser', |
| '/usr/lib/chromium/chromium', |
| shutil.which('chromium'), |
| shutil.which('chromium-browser'), |
| ] |
|
|
| for path in paths: |
| if path and os.path.exists(path): |
| logger.info(f"Found Chromium at: {path}") |
| return path |
|
|
| logger.warning("Chromium not found in standard paths") |
| return '/usr/bin/chromium' |
|
|
| def _scrape_twitter_profile(self, source_name: str, source_info: Dict, timeout: int = 30) -> List[Dict]: |
| """Scrape tweets from a single Twitter profile using Playwright""" |
| if not PLAYWRIGHT_AVAILABLE: |
| logger.warning("Playwright not available") |
| return [] |
|
|
| try: |
| with sync_playwright() as p: |
| |
| browser = p.chromium.launch( |
| executable_path=self.chromium_path, |
| headless=True, |
| args=[ |
| '--disable-blink-features=AutomationControlled', |
| '--disable-dev-shm-usage', |
| '--no-sandbox', |
| '--disable-setuid-sandbox', |
| '--disable-gpu', |
| '--disable-software-rasterizer' |
| ] |
| ) |
| context = browser.new_context( |
| user_agent='Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' |
| ) |
| page = context.new_page() |
|
|
| |
| def route_intercept(route): |
| if route.request.resource_type in ["image", "media", "font", "stylesheet", "video"]: |
| route.abort() |
| else: |
| route.continue_() |
|
|
| page.route("**/*", route_intercept) |
|
|
| |
| logger.info(f"Scraping {source_name}...") |
| page.goto(source_info['url'], timeout=timeout * 1000, wait_until="domcontentloaded") |
|
|
| |
| try: |
| page.wait_for_selector("article", timeout=15000) |
| except PlaywrightTimeoutError: |
| logger.warning(f"Timeout waiting for tweets from {source_name}") |
| browser.close() |
| return [] |
|
|
| |
| tweet_elements = page.locator("article div[data-testid='tweetText']").all() |
|
|
| news_items = [] |
| for idx, element in enumerate(tweet_elements[:15]): |
| try: |
| text = element.text_content() |
| if not text or len(text) < 10: |
| continue |
|
|
| |
| text = text.strip() |
| text = re.sub(r'\s+', ' ', text) |
|
|
| |
| if text.startswith('RT @') or text.startswith('@'): |
| continue |
|
|
| |
| category = self._categorize_text(text, source_info['specialization']) |
| sentiment = self._analyze_sentiment(text) |
| impact = self._assess_impact(source_info['weight'], text) |
| is_breaking = self._detect_breaking_news(text) |
|
|
| |
| summary = self._extract_summary(text) if len(text) > 150 else text |
|
|
| news_items.append({ |
| 'id': hash(f"{source_name}_{idx}_{datetime.now().isoformat()}"), |
| 'title': text, |
| 'summary': summary, |
| 'source': source_info['handle'], |
| 'category': category, |
| 'timestamp': datetime.now() - timedelta(minutes=idx), |
| 'sentiment': sentiment, |
| 'impact': impact, |
| 'url': source_info['url'], |
| 'likes': 0, |
| 'retweets': 0, |
| 'is_breaking': is_breaking, |
| 'source_weight': source_info['weight'], |
| 'from_web': True |
| }) |
|
|
| except Exception as e: |
| logger.debug(f"Error parsing tweet from {source_name}: {e}") |
| continue |
|
|
| browser.close() |
| logger.info(f"Scraped {len(news_items)} tweets from {source_name}") |
| return news_items |
|
|
| except Exception as e: |
| logger.error(f"Error scraping {source_name}: {e}") |
| return [] |
|
|
| def scrape_twitter_news(self, max_tweets: int = 100) -> List[Dict]: |
| """ |
| Scrape latest financial news from Twitter using Playwright |
| Runs in parallel for better performance - 19 sources in ~30-45 seconds |
| """ |
| if not PLAYWRIGHT_AVAILABLE: |
| logger.info("Playwright not available - using mock data") |
| return self._get_mock_news() |
|
|
| all_news = [] |
| seen_texts = set() |
|
|
| |
| sorted_sources = sorted( |
| self.SOURCES.items(), |
| key=lambda x: x[1]['weight'], |
| reverse=True |
| ) |
|
|
| |
| |
| with ThreadPoolExecutor(max_workers=8) as executor: |
| futures = [] |
| for name, info in sorted_sources: |
| |
| future = executor.submit(self._scrape_twitter_profile, name, info, timeout=30) |
| futures.append((future, name)) |
|
|
| for future, source_name in futures: |
| try: |
| |
| news_items = future.result(timeout=35) |
|
|
| |
| unique_items = [] |
| for item in news_items: |
| text_hash = hash(item['title'][:100]) |
| if text_hash not in seen_texts: |
| seen_texts.add(text_hash) |
| unique_items.append(item) |
|
|
| all_news.extend(unique_items) |
| if len(unique_items) > 0: |
| logger.info(f"Fetched {len(unique_items)} unique tweets from {source_name}") |
|
|
| except FuturesTimeoutError: |
| logger.warning(f"Timeout scraping {source_name} - skipping") |
| except Exception as e: |
| logger.error(f"Error processing {source_name}: {e}") |
|
|
| |
| if not all_news: |
| logger.warning("No tweets fetched - using mock data") |
| return self._get_mock_news() |
|
|
| |
| all_news.sort( |
| key=lambda x: (x['is_breaking'], x['impact'] == 'high', x['timestamp']), |
| reverse=True |
| ) |
|
|
| logger.info(f"Total unique tweets: {len(all_news)}") |
| return all_news[:max_tweets] |
|
|
| def _categorize_text(self, text: str, source_specialization: List[str]) -> str: |
| """Categorize news based on keywords and source specialization""" |
| text_lower = text.lower() |
|
|
| |
| macro_score = sum(1 for kw in self.MACRO_KEYWORDS if kw.lower() in text_lower) |
| market_score = sum(1 for kw in self.MARKET_KEYWORDS if kw.lower() in text_lower) |
| geo_score = sum(1 for kw in self.GEOPOLITICAL_KEYWORDS if kw.lower() in text_lower) |
|
|
| |
| if 'macro' in source_specialization: |
| macro_score *= 1.5 |
| if 'markets' in source_specialization: |
| market_score *= 1.5 |
| if 'geopolitical' in source_specialization: |
| geo_score *= 1.5 |
|
|
| |
| scores = {'macro': macro_score, 'markets': market_score, 'geopolitical': geo_score} |
| return max(scores, key=scores.get) |
|
|
| def _analyze_sentiment(self, text: str) -> str: |
| """Simple keyword-based sentiment analysis for trading""" |
| text_lower = text.lower() |
|
|
| positive_keywords = ['surge', 'rally', 'gain', 'rise', 'up', 'bullish', 'strong', 'beat', 'exceed'] |
| negative_keywords = ['crash', 'plunge', 'fall', 'down', 'bearish', 'weak', 'miss', 'below', 'loss'] |
|
|
| pos_count = sum(1 for kw in positive_keywords if kw in text_lower) |
| neg_count = sum(1 for kw in negative_keywords if kw in text_lower) |
|
|
| if pos_count > neg_count: |
| return 'positive' |
| elif neg_count > pos_count: |
| return 'negative' |
| return 'neutral' |
|
|
| def _assess_impact(self, source_weight: float, text: str) -> str: |
| """Assess market impact based on source weight and keywords""" |
| text_lower = text.lower() |
|
|
| high_impact_keywords = ['breaking', 'alert', 'urgent', 'flash', 'fed', 'powell', 'rate', 'war'] |
| impact_score = sum(1 for kw in high_impact_keywords if kw in text_lower) |
|
|
| |
| total_impact = source_weight + (impact_score * 0.3) |
|
|
| if total_impact >= 1.8: |
| return 'high' |
| elif total_impact >= 1.4: |
| return 'medium' |
| return 'low' |
|
|
| def _detect_breaking_news(self, text: str) -> bool: |
| """Detect if news is breaking/urgent""" |
| text_lower = text.lower() |
| breaking_keywords = ['breaking', 'alert', 'urgent', 'flash', '*breaking*', '🚨'] |
| return any(kw in text_lower for kw in breaking_keywords) |
|
|
| def _extract_summary(self, text: str) -> str: |
| """Extract first 150 characters as summary""" |
| if len(text) <= 150: |
| return text |
| return text[:147] + "..." |
|
|
| def _get_mock_news(self) -> List[Dict]: |
| """Return mock data when scraping fails""" |
| mock_news = [ |
| { |
| 'id': hash('mock1'), |
| 'title': 'Fed signals potential rate pause as inflation moderates', |
| 'summary': 'Fed signals potential rate pause as inflation moderates', |
| 'source': 'Mock Data', |
| 'category': 'macro', |
| 'timestamp': datetime.now() - timedelta(minutes=5), |
| 'sentiment': 'neutral', |
| 'impact': 'high', |
| 'url': 'https://x.com', |
| 'likes': 0, |
| 'retweets': 0, |
| 'is_breaking': False, |
| 'source_weight': 1.5, |
| 'from_web': True |
| }, |
| { |
| 'id': hash('mock2'), |
| 'title': 'S&P 500 futures rise ahead of key earnings reports', |
| 'summary': 'S&P 500 futures rise ahead of key earnings reports', |
| 'source': 'Mock Data', |
| 'category': 'markets', |
| 'timestamp': datetime.now() - timedelta(minutes=15), |
| 'sentiment': 'positive', |
| 'impact': 'medium', |
| 'url': 'https://x.com', |
| 'likes': 0, |
| 'retweets': 0, |
| 'is_breaking': False, |
| 'source_weight': 1.5, |
| 'from_web': True |
| } |
| ] |
| return mock_news |
|
|
| def get_statistics(self) -> Dict: |
| """ |
| Get statistics about cached news |
| Note: Statistics are now managed by NewsCacheManager |
| This method returns empty stats for backward compatibility |
| """ |
| return { |
| 'total': 0, |
| 'high_impact': 0, |
| 'breaking': 0, |
| 'last_update': 'Managed by cache', |
| 'by_category': {} |
| } |
|
|