UnifiedFinancialPlatform / app /services /twitter_news_playwright.py
Dmitry Beresnev
init project
e189a31
"""
Professional Finance News Monitor using Playwright
Real-time Twitter/X scraping without authentication
Optimized for low-latency trading decisions
"""
import pandas as pd
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import streamlit as st
import re
import logging
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
try:
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError
PLAYWRIGHT_AVAILABLE = True
except ImportError:
PLAYWRIGHT_AVAILABLE = False
logger.warning("playwright not available. Install with: pip install playwright && playwright install chromium")
class TwitterFinanceMonitor:
"""
Professional-grade financial news aggregator using Playwright
No authentication required - public Twitter/X profiles only
"""
# Premium financial Twitter accounts
SOURCES = {
# ===== TIER 1: Breaking News Aggregators =====
'walter_bloomberg': {
'handle': 'WalterBloomberg',
'url': 'https://x.com/WalterBloomberg',
'weight': 1.9,
'specialization': ['macro', 'markets', 'geopolitical']
},
'fxhedge': {
'handle': 'Fxhedgers',
'url': 'https://x.com/Fxhedgers',
'weight': 1.7,
'specialization': ['macro', 'markets']
},
'deitaone': {
'handle': 'DeItaone',
'url': 'https://x.com/DeItaone',
'weight': 1.8,
'specialization': ['markets', 'macro']
},
'firstsquawk': {
'handle': 'FirstSquawk',
'url': 'https://x.com/FirstSquawk',
'weight': 1.7,
'specialization': ['markets', 'macro']
},
'livesquawk': {
'handle': 'LiveSquawk',
'url': 'https://x.com/LiveSquawk',
'weight': 1.7,
'specialization': ['markets', 'macro']
},
# ===== TIER 2: Major News Agencies =====
'reuters': {
'handle': 'Reuters',
'url': 'https://x.com/Reuters',
'weight': 1.9,
'specialization': ['geopolitical', 'macro', 'markets']
},
'bloomberg': {
'handle': 'business',
'url': 'https://x.com/business',
'weight': 1.9,
'specialization': ['markets', 'macro']
},
'ft': {
'handle': 'FT',
'url': 'https://x.com/FT',
'weight': 1.8,
'specialization': ['markets', 'macro', 'geopolitical']
},
'wsj': {
'handle': 'WSJ',
'url': 'https://x.com/WSJ',
'weight': 1.8,
'specialization': ['markets', 'macro', 'geopolitical']
},
'cnbc': {
'handle': 'CNBC',
'url': 'https://x.com/CNBC',
'weight': 1.6,
'specialization': ['markets', 'macro']
},
'bbcbusiness': {
'handle': 'BBCBusiness',
'url': 'https://x.com/BBCBusiness',
'weight': 1.7,
'specialization': ['geopolitical', 'macro', 'markets']
},
# ===== TIER 3: Specialized Financial Media =====
'zerohedge': {
'handle': 'zerohedge',
'url': 'https://x.com/zerohedge',
'weight': 1.5,
'specialization': ['macro', 'geopolitical', 'markets']
},
'marketwatch': {
'handle': 'MarketWatch',
'url': 'https://x.com/MarketWatch',
'weight': 1.6,
'specialization': ['markets', 'macro']
},
'unusual_whales': {
'handle': 'unusual_whales',
'url': 'https://x.com/unusual_whales',
'weight': 1.5,
'specialization': ['markets']
},
'financialtimes': {
'handle': 'FinancialTimes',
'url': 'https://x.com/FinancialTimes',
'weight': 1.8,
'specialization': ['markets', 'macro', 'geopolitical']
},
# ===== TIER 4: Economists & Analysis =====
'economics': {
'handle': 'economics',
'url': 'https://x.com/economics',
'weight': 1.7,
'specialization': ['macro', 'geopolitical']
},
'ap': {
'handle': 'AP',
'url': 'https://x.com/AP',
'weight': 1.7,
'specialization': ['geopolitical', 'macro']
},
'afp': {
'handle': 'AFP',
'url': 'https://x.com/AFP',
'weight': 1.7,
'specialization': ['geopolitical', 'macro']
},
'ajenglish': {
'handle': 'AJEnglish',
'url': 'https://x.com/AJEnglish',
'weight': 1.6,
'specialization': ['geopolitical', 'macro']
}
}
# Keyword detection for categorization
MACRO_KEYWORDS = [
'Fed', 'ECB', 'BoE', 'BoJ', 'FOMC', 'Powell', 'Lagarde',
'interest rate', 'inflation', 'CPI', 'PPI', 'GDP',
'unemployment', 'jobs report', 'NFP', 'central bank',
'monetary policy', 'quantitative', 'recession'
]
MARKET_KEYWORDS = [
'S&P', 'Dow', 'Nasdaq', 'Russell', 'stocks', 'equities',
'earnings', 'revenue', 'profit', 'shares', 'IPO',
'merger', 'acquisition', 'crypto', 'Bitcoin', 'Ethereum',
'oil', 'gold', 'commodities', 'futures', 'options'
]
GEOPOLITICAL_KEYWORDS = [
'war', 'conflict', 'sanctions', 'trade', 'tariff',
'China', 'Russia', 'Ukraine', 'Taiwan', 'Middle East',
'election', 'government', 'military', 'diplomatic',
'treaty', 'EU', 'Brexit', 'OPEC'
]
def __init__(self):
"""Initialize monitor"""
# Find Chromium executable
self.chromium_path = self._find_chromium()
def _find_chromium(self) -> str:
"""Find Chromium installation path"""
import os
import shutil
# Try common paths
paths = [
'/usr/bin/chromium',
'/usr/bin/chromium-browser',
'/usr/lib/chromium/chromium',
shutil.which('chromium'),
shutil.which('chromium-browser'),
]
for path in paths:
if path and os.path.exists(path):
logger.info(f"Found Chromium at: {path}")
return path
logger.warning("Chromium not found in standard paths")
return '/usr/bin/chromium' # Fallback
def _scrape_twitter_profile(self, source_name: str, source_info: Dict, timeout: int = 30) -> List[Dict]:
"""Scrape tweets from a single Twitter profile using Playwright"""
if not PLAYWRIGHT_AVAILABLE:
logger.warning("Playwright not available")
return []
try:
with sync_playwright() as p:
# Launch lightweight browser with aggressive performance flags
browser = p.chromium.launch(
executable_path=self.chromium_path,
headless=True,
args=[
'--disable-blink-features=AutomationControlled',
'--disable-dev-shm-usage', # Overcome limited resource problems
'--no-sandbox', # Required for some environments
'--disable-setuid-sandbox',
'--disable-gpu', # Not needed in headless
'--disable-software-rasterizer'
]
)
context = browser.new_context(
user_agent='Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
)
page = context.new_page()
# Block images, fonts, css, and videos for speed
def route_intercept(route):
if route.request.resource_type in ["image", "media", "font", "stylesheet", "video"]:
route.abort()
else:
route.continue_()
page.route("**/*", route_intercept)
# Navigate to profile with increased timeout
logger.info(f"Scraping {source_name}...")
page.goto(source_info['url'], timeout=timeout * 1000, wait_until="domcontentloaded")
# Wait for tweets to load with increased timeout
try:
page.wait_for_selector("article", timeout=15000) # Increased to 15 seconds
except PlaywrightTimeoutError:
logger.warning(f"Timeout waiting for tweets from {source_name}")
browser.close()
return []
# Extract tweet texts (limit to 15)
tweet_elements = page.locator("article div[data-testid='tweetText']").all()
news_items = []
for idx, element in enumerate(tweet_elements[:15]): # Reduced from 20 to 15 for speed
try:
text = element.text_content()
if not text or len(text) < 10:
continue
# Clean text
text = text.strip()
text = re.sub(r'\s+', ' ', text)
# Skip retweets and replies
if text.startswith('RT @') or text.startswith('@'):
continue
# Categorize and analyze
category = self._categorize_text(text, source_info['specialization'])
sentiment = self._analyze_sentiment(text)
impact = self._assess_impact(source_info['weight'], text)
is_breaking = self._detect_breaking_news(text)
# Create summary
summary = self._extract_summary(text) if len(text) > 150 else text
news_items.append({
'id': hash(f"{source_name}_{idx}_{datetime.now().isoformat()}"),
'title': text,
'summary': summary,
'source': source_info['handle'],
'category': category,
'timestamp': datetime.now() - timedelta(minutes=idx), # Approximate time
'sentiment': sentiment,
'impact': impact,
'url': source_info['url'],
'likes': 0,
'retweets': 0,
'is_breaking': is_breaking,
'source_weight': source_info['weight'],
'from_web': True
})
except Exception as e:
logger.debug(f"Error parsing tweet from {source_name}: {e}")
continue
browser.close()
logger.info(f"Scraped {len(news_items)} tweets from {source_name}")
return news_items
except Exception as e:
logger.error(f"Error scraping {source_name}: {e}")
return []
def scrape_twitter_news(self, max_tweets: int = 100) -> List[Dict]:
"""
Scrape latest financial news from Twitter using Playwright
Runs in parallel for better performance - 19 sources in ~30-45 seconds
"""
if not PLAYWRIGHT_AVAILABLE:
logger.info("Playwright not available - using mock data")
return self._get_mock_news()
all_news = []
seen_texts = set()
# Sort sources by weight (priority) - scrape high-value sources first
sorted_sources = sorted(
self.SOURCES.items(),
key=lambda x: x[1]['weight'],
reverse=True
)
# Scrape sources in parallel with moderate concurrency
# 8 workers = 19 sources in 3 batches (~60-90 seconds total)
with ThreadPoolExecutor(max_workers=8) as executor:
futures = []
for name, info in sorted_sources:
# Increased timeout for better success rate
future = executor.submit(self._scrape_twitter_profile, name, info, timeout=30)
futures.append((future, name))
for future, source_name in futures:
try:
# Wait max 35 seconds per source (increased for reliability)
news_items = future.result(timeout=35)
# Deduplicate based on text similarity
unique_items = []
for item in news_items:
text_hash = hash(item['title'][:100])
if text_hash not in seen_texts:
seen_texts.add(text_hash)
unique_items.append(item)
all_news.extend(unique_items)
if len(unique_items) > 0:
logger.info(f"Fetched {len(unique_items)} unique tweets from {source_name}")
except FuturesTimeoutError:
logger.warning(f"Timeout scraping {source_name} - skipping")
except Exception as e:
logger.error(f"Error processing {source_name}: {e}")
# If no news was fetched, use mock data
if not all_news:
logger.warning("No tweets fetched - using mock data")
return self._get_mock_news()
# Sort by breaking news, then impact, then timestamp
all_news.sort(
key=lambda x: (x['is_breaking'], x['impact'] == 'high', x['timestamp']),
reverse=True
)
logger.info(f"Total unique tweets: {len(all_news)}")
return all_news[:max_tweets]
def _categorize_text(self, text: str, source_specialization: List[str]) -> str:
"""Categorize news based on keywords and source specialization"""
text_lower = text.lower()
# Count keyword matches
macro_score = sum(1 for kw in self.MACRO_KEYWORDS if kw.lower() in text_lower)
market_score = sum(1 for kw in self.MARKET_KEYWORDS if kw.lower() in text_lower)
geo_score = sum(1 for kw in self.GEOPOLITICAL_KEYWORDS if kw.lower() in text_lower)
# Boost scores based on source specialization
if 'macro' in source_specialization:
macro_score *= 1.5
if 'markets' in source_specialization:
market_score *= 1.5
if 'geopolitical' in source_specialization:
geo_score *= 1.5
# Return category with highest score
scores = {'macro': macro_score, 'markets': market_score, 'geopolitical': geo_score}
return max(scores, key=scores.get)
def _analyze_sentiment(self, text: str) -> str:
"""Simple keyword-based sentiment analysis for trading"""
text_lower = text.lower()
positive_keywords = ['surge', 'rally', 'gain', 'rise', 'up', 'bullish', 'strong', 'beat', 'exceed']
negative_keywords = ['crash', 'plunge', 'fall', 'down', 'bearish', 'weak', 'miss', 'below', 'loss']
pos_count = sum(1 for kw in positive_keywords if kw in text_lower)
neg_count = sum(1 for kw in negative_keywords if kw in text_lower)
if pos_count > neg_count:
return 'positive'
elif neg_count > pos_count:
return 'negative'
return 'neutral'
def _assess_impact(self, source_weight: float, text: str) -> str:
"""Assess market impact based on source weight and keywords"""
text_lower = text.lower()
high_impact_keywords = ['breaking', 'alert', 'urgent', 'flash', 'fed', 'powell', 'rate', 'war']
impact_score = sum(1 for kw in high_impact_keywords if kw in text_lower)
# Combine source weight and keyword impact
total_impact = source_weight + (impact_score * 0.3)
if total_impact >= 1.8:
return 'high'
elif total_impact >= 1.4:
return 'medium'
return 'low'
def _detect_breaking_news(self, text: str) -> bool:
"""Detect if news is breaking/urgent"""
text_lower = text.lower()
breaking_keywords = ['breaking', 'alert', 'urgent', 'flash', '*breaking*', '🚨']
return any(kw in text_lower for kw in breaking_keywords)
def _extract_summary(self, text: str) -> str:
"""Extract first 150 characters as summary"""
if len(text) <= 150:
return text
return text[:147] + "..."
def _get_mock_news(self) -> List[Dict]:
"""Return mock data when scraping fails"""
mock_news = [
{
'id': hash('mock1'),
'title': 'Fed signals potential rate pause as inflation moderates',
'summary': 'Fed signals potential rate pause as inflation moderates',
'source': 'Mock Data',
'category': 'macro',
'timestamp': datetime.now() - timedelta(minutes=5),
'sentiment': 'neutral',
'impact': 'high',
'url': 'https://x.com',
'likes': 0,
'retweets': 0,
'is_breaking': False,
'source_weight': 1.5,
'from_web': True
},
{
'id': hash('mock2'),
'title': 'S&P 500 futures rise ahead of key earnings reports',
'summary': 'S&P 500 futures rise ahead of key earnings reports',
'source': 'Mock Data',
'category': 'markets',
'timestamp': datetime.now() - timedelta(minutes=15),
'sentiment': 'positive',
'impact': 'medium',
'url': 'https://x.com',
'likes': 0,
'retweets': 0,
'is_breaking': False,
'source_weight': 1.5,
'from_web': True
}
]
return mock_news
def get_statistics(self) -> Dict:
"""
Get statistics about cached news
Note: Statistics are now managed by NewsCacheManager
This method returns empty stats for backward compatibility
"""
return {
'total': 0,
'high_impact': 0,
'breaking': 0,
'last_update': 'Managed by cache',
'by_category': {}
}