""" Professional Finance News Monitor using snscrape Real-time tracking: Macro, Markets, Geopolitical intelligence Optimized for low-latency trading decisions """ import pandas as pd from datetime import datetime, timedelta from typing import List, Dict, Optional import streamlit as st import time import logging import re # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) try: import snscrape.modules.twitter as sntwitter SNSCRAPE_AVAILABLE = True except ImportError: SNSCRAPE_AVAILABLE = False logger.warning("snscrape not available. Install with: pip install snscrape") class FinanceNewsMonitor: """ Professional-grade financial news aggregator Sources: Bloomberg, Reuters, WSJ, FT, CNBC, ZeroHedge """ # Premium financial sources - expanded coverage SOURCES = { # ===== TIER 1: Major Financial News ===== 'reuters': { 'handle': '@Reuters', 'weight': 1.5, 'specialization': ['macro', 'geopolitical', 'markets'] }, 'bloomberg': { 'handle': '@business', 'weight': 1.5, 'specialization': ['macro', 'markets'] }, 'ft': { 'handle': '@FT', 'weight': 1.4, 'specialization': ['macro', 'markets'] }, 'economist': { 'handle': '@TheEconomist', 'weight': 1.3, 'specialization': ['macro', 'geopolitical'] }, 'wsj': { 'handle': '@WSJ', 'weight': 1.4, 'specialization': ['markets', 'macro'] }, 'bloomberg_terminal': { 'handle': '@Bloomberg', 'weight': 1.5, 'specialization': ['macro', 'markets'] }, 'cnbc': { 'handle': '@CNBC', 'weight': 1.2, 'specialization': ['markets'] }, 'marketwatch': { 'handle': '@MarketWatch', 'weight': 1.1, 'specialization': ['markets'] }, # ===== TIER 2: Geopolitical Intelligence ===== 'bbc_world': { 'handle': '@BBCWorld', 'weight': 1.4, 'specialization': ['geopolitical'] }, 'afp': { 'handle': '@AFP', 'weight': 1.3, 'specialization': ['geopolitical'] }, 'aljazeera': { 'handle': '@AlJazeera', 'weight': 1.2, 'specialization': ['geopolitical'] }, 'politico': { 'handle': '@politico', 'weight': 1.2, 'specialization': ['geopolitical', 'macro'] }, 'dw_news': { 'handle': '@dwnews', 'weight': 1.2, 'specialization': ['geopolitical'] }, # ===== TIER 3: Central Banks & Official Sources ===== 'federal_reserve': { 'handle': '@federalreserve', 'weight': 2.0, # Highest priority 'specialization': ['macro'] }, 'ecb': { 'handle': '@ecb', 'weight': 2.0, 'specialization': ['macro'] }, 'lagarde': { 'handle': '@Lagarde', 'weight': 1.9, # ECB President 'specialization': ['macro'] }, 'bank_of_england': { 'handle': '@bankofengland', 'weight': 1.8, 'specialization': ['macro'] }, 'imf': { 'handle': '@IMFNews', 'weight': 1.7, 'specialization': ['macro', 'geopolitical'] }, 'world_bank': { 'handle': '@worldbank', 'weight': 1.6, 'specialization': ['macro', 'geopolitical'] }, 'us_treasury': { 'handle': '@USTreasury', 'weight': 1.8, 'specialization': ['macro'] }, # ===== TIER 4: Alpha Accounts (Fast Breaking News) ===== 'zerohedge': { 'handle': '@zerohedge', 'weight': 1.0, 'specialization': ['markets', 'macro'] }, 'first_squawk': { 'handle': '@FirstSquawk', 'weight': 1.1, # Fast alerts 'specialization': ['markets', 'macro'] }, 'live_squawk': { 'handle': '@LiveSquawk', 'weight': 1.1, # Real-time market squawks 'specialization': ['markets', 'macro'] } } # Enhanced keyword detection for professional traders MACRO_KEYWORDS = [ # Central Banks & Policy 'Fed', 'ECB', 'BoE', 'BoJ', 'FOMC', 'Powell', 'Lagarde', 'interest rate', 'rate cut', 'rate hike', 'QE', 'quantitative', 'monetary policy', 'dovish', 'hawkish', # Economic Indicators 'GDP', 'inflation', 'CPI', 'PPI', 'PCE', 'NFP', 'payroll', 'unemployment', 'jobless', 'retail sales', 'PMI', 'ISM', 'consumer confidence', 'durable goods', 'housing starts', # Fiscal & Economic 'recession', 'stimulus', 'fiscal policy', 'treasury', 'yield curve', 'bond market' ] GEO_KEYWORDS = [ # Conflict & Security 'war', 'conflict', 'military', 'missile', 'attack', 'invasion', 'sanctions', 'embargo', 'blockade', # Political 'election', 'impeachment', 'coup', 'protest', 'unrest', 'geopolitical', 'tension', 'crisis', 'dispute', # Trade & Relations 'trade war', 'tariff', 'trade deal', 'summit', 'treaty', 'China', 'Russia', 'Taiwan', 'Middle East', 'Ukraine' ] MARKET_KEYWORDS = [ # Indices & General 'S&P', 'Nasdaq', 'Dow', 'Russell', 'VIX', 'volatility', 'rally', 'sell-off', 'correction', 'crash', 'bull', 'bear', # Corporate Events 'earnings', 'EPS', 'revenue', 'guidance', 'beat', 'miss', 'IPO', 'merger', 'acquisition', 'M&A', 'buyback', 'dividend', # Sectors & Assets 'tech stocks', 'banks', 'energy', 'commodities', 'crypto', 'Bitcoin', 'oil', 'gold', 'dollar', 'DXY' ] # High-impact market-moving keywords BREAKING_KEYWORDS = [ 'BREAKING', 'ALERT', 'URGENT', 'just in', 'developing', 'Fed', 'Powell', 'emergency', 'unexpected', 'surprise' ] def __init__(self): self.news_cache = [] self.last_fetch = None self.cache_ttl = 180 # 3 minutes for low latency @st.cache_data(ttl=180) def scrape_twitter_news(_self, max_tweets: int = 100) -> List[Dict]: """ Scrape latest financial news with caching max_tweets: Total tweets to fetch (distributed across sources) """ if not SNSCRAPE_AVAILABLE: logger.info("snscrape not available - using mock data") return _self._get_mock_news() all_tweets = [] tweets_per_source = max(5, max_tweets // len(_self.SOURCES)) failed_sources = 0 for source_name, source_info in _self.SOURCES.items(): try: handle = source_info['handle'].replace('@', '') # Optimized query: exclude replies and retweets for signal clarity query = f"from:{handle} -filter:replies -filter:retweets" scraped = 0 for tweet in sntwitter.TwitterSearchScraper(query).get_items(): if scraped >= tweets_per_source: break # Skip old tweets (>24h) if (datetime.now() - tweet.date).days > 1: continue # Categorize and analyze category = _self._categorize_tweet(tweet.content, source_info['specialization']) sentiment = _self._analyze_sentiment(tweet.content) impact = _self._assess_impact(tweet, source_info['weight']) is_breaking = _self._detect_breaking_news(tweet.content) all_tweets.append({ 'id': tweet.id, 'title': tweet.content, 'summary': _self._extract_summary(tweet.content), 'source': source_name.capitalize(), 'category': category, 'timestamp': tweet.date, 'sentiment': sentiment, 'impact': impact, 'url': tweet.url, 'likes': tweet.likeCount or 0, 'retweets': tweet.retweetCount or 0, 'is_breaking': is_breaking, 'source_weight': source_info['weight'] }) scraped += 1 except Exception as e: failed_sources += 1 error_msg = str(e).lower() if 'blocked' in error_msg or '404' in error_msg: logger.warning(f"Twitter/X API blocked access for {source_name}") else: logger.error(f"Error scraping {source_name}: {e}") continue # If Twitter/X blocked all sources, fall back to mock data if failed_sources >= len(_self.SOURCES) or len(all_tweets) == 0: logger.warning("Twitter/X API unavailable - falling back to mock data for demonstration") return _self._get_mock_news() # Sort by impact and timestamp all_tweets.sort( key=lambda x: (x['is_breaking'], x['impact'] == 'high', x['timestamp']), reverse=True ) return all_tweets def _categorize_tweet(self, text: str, source_specialization: List[str]) -> str: """Advanced categorization with source specialization""" text_lower = text.lower() # Calculate weighted scores macro_score = sum(2 if kw.lower() in text_lower else 0 for kw in self.MACRO_KEYWORDS) geo_score = sum(2 if kw.lower() in text_lower else 0 for kw in self.GEO_KEYWORDS) market_score = sum(2 if kw.lower() in text_lower else 0 for kw in self.MARKET_KEYWORDS) # Boost scores based on source specialization if 'macro' in source_specialization: macro_score *= 1.5 if 'geopolitical' in source_specialization: geo_score *= 1.5 if 'markets' in source_specialization: market_score *= 1.5 scores = { 'macro': macro_score, 'geopolitical': geo_score, 'markets': market_score } return max(scores, key=scores.get) if max(scores.values()) > 0 else 'general' def _analyze_sentiment(self, text: str) -> str: """Professional sentiment analysis for trading""" positive_words = [ 'surge', 'rally', 'soar', 'jump', 'gain', 'rise', 'climb', 'growth', 'positive', 'strong', 'robust', 'beat', 'exceed', 'outperform', 'record high', 'breakthrough', 'optimistic' ] negative_words = [ 'plunge', 'crash', 'tumble', 'fall', 'drop', 'decline', 'slump', 'loss', 'weak', 'fragile', 'crisis', 'concern', 'risk', 'fear', 'miss', 'disappoint', 'warning', 'downgrade', 'recession' ] text_lower = text.lower() pos_count = sum(2 if word in text_lower else 0 for word in positive_words) neg_count = sum(2 if word in text_lower else 0 for word in negative_words) # Threshold for clear signal if pos_count > neg_count + 1: return 'positive' elif neg_count > pos_count + 1: return 'negative' return 'neutral' def _assess_impact(self, tweet, source_weight: float) -> str: """Assess market impact based on engagement and source credibility""" engagement = (tweet.likeCount or 0) + (tweet.retweetCount or 0) * 2 weighted_engagement = engagement * source_weight # Breaking news always high impact if self._detect_breaking_news(tweet.content): return 'high' if weighted_engagement > 1500 or source_weight >= 2.0: return 'high' elif weighted_engagement > 300: return 'medium' return 'low' def _detect_breaking_news(self, text: str) -> bool: """Detect breaking/urgent news for immediate alerts""" text_upper = text.upper() return any(keyword.upper() in text_upper for keyword in self.BREAKING_KEYWORDS) def _extract_summary(self, text: str, max_length: int = 200) -> str: """Extract clean summary for display""" # Remove URLs import re text = re.sub(r'http\S+', '', text) text = text.strip() if len(text) <= max_length: return text return text[:max_length] + '...' def _get_mock_news(self) -> List[Dict]: """Mock news data when snscrape is unavailable - Showcases all source types""" return [ # Tier 3: Central Bank - BREAKING { 'id': 1, 'title': 'BREAKING: Federal Reserve announces emergency rate cut of 50bps - Powell cites economic uncertainty', 'summary': 'BREAKING: Fed emergency rate cut 50bps', 'source': 'Federal Reserve', 'category': 'macro', 'timestamp': datetime.now() - timedelta(minutes=5), 'sentiment': 'negative', 'impact': 'high', 'url': 'https://twitter.com/federalreserve', 'likes': 5000, 'retweets': 2000, 'is_breaking': True, 'source_weight': 2.0 }, # Tier 4: Alpha Account - Fast Alert { 'id': 2, 'title': '*FIRST SQUAWK: S&P 500 FUTURES DROP 2% AFTER FED ANNOUNCEMENT', 'summary': '*FIRST SQUAWK: S&P 500 futures drop 2%', 'source': 'First Squawk', 'category': 'markets', 'timestamp': datetime.now() - timedelta(minutes=10), 'sentiment': 'negative', 'impact': 'high', 'url': 'https://twitter.com/FirstSquawk', 'likes': 1500, 'retweets': 600, 'is_breaking': False, 'source_weight': 1.1 }, # Tier 1: Bloomberg - Markets { 'id': 3, 'title': 'Apple reports earnings beat with $123B revenue, raises dividend by 4% - Stock up 3% after hours', 'summary': 'Apple beats earnings, raises dividend 4%', 'source': 'Bloomberg', 'category': 'markets', 'timestamp': datetime.now() - timedelta(minutes=25), 'sentiment': 'positive', 'impact': 'high', 'url': 'https://twitter.com/business', 'likes': 2800, 'retweets': 900, 'is_breaking': False, 'source_weight': 1.5 }, # Tier 3: ECB President { 'id': 4, 'title': 'ECB President Lagarde: Inflation remains above target, rates to stay higher for longer', 'summary': 'Lagarde: rates to stay higher for longer', 'source': 'Lagarde', 'category': 'macro', 'timestamp': datetime.now() - timedelta(minutes=45), 'sentiment': 'neutral', 'impact': 'high', 'url': 'https://twitter.com/Lagarde', 'likes': 1200, 'retweets': 400, 'is_breaking': False, 'source_weight': 1.9 }, # Tier 2: Geopolitical - BBC { 'id': 5, 'title': 'Ukraine conflict: New peace talks scheduled as tensions ease in Eastern Europe', 'summary': 'Ukraine: New peace talks scheduled', 'source': 'BBC World', 'category': 'geopolitical', 'timestamp': datetime.now() - timedelta(hours=1), 'sentiment': 'positive', 'impact': 'medium', 'url': 'https://twitter.com/BBCWorld', 'likes': 3500, 'retweets': 1200, 'is_breaking': False, 'source_weight': 1.4 }, # Tier 1: Reuters - Macro { 'id': 6, 'title': 'US GDP growth revised up to 2.8% in Q4, beating economists expectations of 2.5%', 'summary': 'US GDP growth revised up to 2.8% in Q4', 'source': 'Reuters', 'category': 'macro', 'timestamp': datetime.now() - timedelta(hours=2), 'sentiment': 'positive', 'impact': 'medium', 'url': 'https://twitter.com/Reuters', 'likes': 1800, 'retweets': 600, 'is_breaking': False, 'source_weight': 1.5 }, # Tier 4: Live Squawk { 'id': 7, 'title': '*LIVE SQUAWK: Oil prices surge 5% on Middle East supply concerns, Brent crude at $92/barrel', 'summary': '*LIVE SQUAWK: Oil surges 5% on supply fears', 'source': 'Live Squawk', 'category': 'markets', 'timestamp': datetime.now() - timedelta(hours=3), 'sentiment': 'neutral', 'impact': 'medium', 'url': 'https://twitter.com/LiveSquawk', 'likes': 900, 'retweets': 350, 'is_breaking': False, 'source_weight': 1.1 }, # Tier 3: IMF { 'id': 8, 'title': 'IMF upgrades global growth forecast to 3.2% for 2024, warns of recession risks in Europe', 'summary': 'IMF upgrades global growth to 3.2%', 'source': 'IMF', 'category': 'macro', 'timestamp': datetime.now() - timedelta(hours=4), 'sentiment': 'neutral', 'impact': 'medium', 'url': 'https://twitter.com/IMFNews', 'likes': 800, 'retweets': 300, 'is_breaking': False, 'source_weight': 1.7 }, # Tier 2: Politico - Geopolitical { 'id': 9, 'title': 'US-China trade talks resume in Washington, focus on technology transfer and tariffs', 'summary': 'US-China trade talks resume', 'source': 'Politico', 'category': 'geopolitical', 'timestamp': datetime.now() - timedelta(hours=5), 'sentiment': 'neutral', 'impact': 'low', 'url': 'https://twitter.com/politico', 'likes': 600, 'retweets': 200, 'is_breaking': False, 'source_weight': 1.2 }, # Tier 1: FT - Markets { 'id': 10, 'title': 'Bank of America cuts recession probability to 20%, cites resilient consumer spending', 'summary': 'BofA cuts recession probability to 20%', 'source': 'FT', 'category': 'markets', 'timestamp': datetime.now() - timedelta(hours=6), 'sentiment': 'positive', 'impact': 'low', 'url': 'https://twitter.com/FT', 'likes': 700, 'retweets': 250, 'is_breaking': False, 'source_weight': 1.4 } ] def get_news(self, category: str = 'all', sentiment: str = 'all', impact: str = 'all', refresh: bool = False) -> pd.DataFrame: """ Get filtered news with intelligent caching Args: category: 'all', 'macro', 'geopolitical', 'markets' sentiment: 'all', 'positive', 'negative', 'neutral' impact: 'all', 'high', 'medium', 'low' refresh: Force refresh cache """ # Check cache freshness if refresh or not self.last_fetch or \ (datetime.now() - self.last_fetch).seconds > self.cache_ttl: self.news_cache = self.scrape_twitter_news(max_tweets=100) self.last_fetch = datetime.now() news = self.news_cache.copy() # Apply filters if category != 'all': news = [n for n in news if n['category'] == category] if sentiment != 'all': news = [n for n in news if n['sentiment'] == sentiment] if impact != 'all': news = [n for n in news if n['impact'] == impact] df = pd.DataFrame(news) if not df.empty: df['timestamp'] = pd.to_datetime(df['timestamp']) return df def get_breaking_news(self) -> pd.DataFrame: """Get only breaking/high-impact news for alerts""" df = self.get_news() if not df.empty: return df[df['is_breaking'] == True].head(10) return df def get_statistics(self) -> Dict: """Get news feed statistics""" if not self.news_cache: return { 'total': 0, 'high_impact': 0, 'breaking': 0, 'last_update': 'Never' } return { 'total': len(self.news_cache), 'high_impact': len([n for n in self.news_cache if n['impact'] == 'high']), 'breaking': len([n for n in self.news_cache if n['is_breaking']]), 'last_update': self.last_fetch.strftime('%H:%M:%S') if self.last_fetch else 'Never', 'by_category': { 'macro': len([n for n in self.news_cache if n['category'] == 'macro']), 'geopolitical': len([n for n in self.news_cache if n['category'] == 'geopolitical']), 'markets': len([n for n in self.news_cache if n['category'] == 'markets']) } }