FinancialPlatform / app /services /news_monitor.py
Dmitry Beresnev
fix logger
d34f6ef
"""
Professional Finance News Monitor using snscrape
Real-time tracking: Macro, Markets, Geopolitical intelligence
Optimized for low-latency trading decisions
"""
import pandas as pd
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import streamlit as st
import time
import logging
import re
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
try:
import snscrape.modules.twitter as sntwitter
SNSCRAPE_AVAILABLE = True
except ImportError:
SNSCRAPE_AVAILABLE = False
logger.warning("snscrape not available. Install with: pip install snscrape")
class FinanceNewsMonitor:
"""
Professional-grade financial news aggregator
Sources: Bloomberg, Reuters, WSJ, FT, CNBC, ZeroHedge
"""
# Premium financial sources - expanded coverage
SOURCES = {
# ===== TIER 1: Major Financial News =====
'reuters': {
'handle': '@Reuters',
'weight': 1.5,
'specialization': ['macro', 'geopolitical', 'markets']
},
'bloomberg': {
'handle': '@business',
'weight': 1.5,
'specialization': ['macro', 'markets']
},
'ft': {
'handle': '@FT',
'weight': 1.4,
'specialization': ['macro', 'markets']
},
'economist': {
'handle': '@TheEconomist',
'weight': 1.3,
'specialization': ['macro', 'geopolitical']
},
'wsj': {
'handle': '@WSJ',
'weight': 1.4,
'specialization': ['markets', 'macro']
},
'bloomberg_terminal': {
'handle': '@Bloomberg',
'weight': 1.5,
'specialization': ['macro', 'markets']
},
'cnbc': {
'handle': '@CNBC',
'weight': 1.2,
'specialization': ['markets']
},
'marketwatch': {
'handle': '@MarketWatch',
'weight': 1.1,
'specialization': ['markets']
},
# ===== TIER 2: Geopolitical Intelligence =====
'bbc_world': {
'handle': '@BBCWorld',
'weight': 1.4,
'specialization': ['geopolitical']
},
'afp': {
'handle': '@AFP',
'weight': 1.3,
'specialization': ['geopolitical']
},
'aljazeera': {
'handle': '@AlJazeera',
'weight': 1.2,
'specialization': ['geopolitical']
},
'politico': {
'handle': '@politico',
'weight': 1.2,
'specialization': ['geopolitical', 'macro']
},
'dw_news': {
'handle': '@dwnews',
'weight': 1.2,
'specialization': ['geopolitical']
},
# ===== TIER 3: Central Banks & Official Sources =====
'federal_reserve': {
'handle': '@federalreserve',
'weight': 2.0, # Highest priority
'specialization': ['macro']
},
'ecb': {
'handle': '@ecb',
'weight': 2.0,
'specialization': ['macro']
},
'lagarde': {
'handle': '@Lagarde',
'weight': 1.9, # ECB President
'specialization': ['macro']
},
'bank_of_england': {
'handle': '@bankofengland',
'weight': 1.8,
'specialization': ['macro']
},
'imf': {
'handle': '@IMFNews',
'weight': 1.7,
'specialization': ['macro', 'geopolitical']
},
'world_bank': {
'handle': '@worldbank',
'weight': 1.6,
'specialization': ['macro', 'geopolitical']
},
'us_treasury': {
'handle': '@USTreasury',
'weight': 1.8,
'specialization': ['macro']
},
# ===== TIER 4: Alpha Accounts (Fast Breaking News) =====
'zerohedge': {
'handle': '@zerohedge',
'weight': 1.0,
'specialization': ['markets', 'macro']
},
'first_squawk': {
'handle': '@FirstSquawk',
'weight': 1.1, # Fast alerts
'specialization': ['markets', 'macro']
},
'live_squawk': {
'handle': '@LiveSquawk',
'weight': 1.1, # Real-time market squawks
'specialization': ['markets', 'macro']
}
}
# Enhanced keyword detection for professional traders
MACRO_KEYWORDS = [
# Central Banks & Policy
'Fed', 'ECB', 'BoE', 'BoJ', 'FOMC', 'Powell', 'Lagarde',
'interest rate', 'rate cut', 'rate hike', 'QE', 'quantitative',
'monetary policy', 'dovish', 'hawkish',
# Economic Indicators
'GDP', 'inflation', 'CPI', 'PPI', 'PCE', 'NFP', 'payroll',
'unemployment', 'jobless', 'retail sales', 'PMI', 'ISM',
'consumer confidence', 'durable goods', 'housing starts',
# Fiscal & Economic
'recession', 'stimulus', 'fiscal policy', 'treasury',
'yield curve', 'bond market'
]
GEO_KEYWORDS = [
# Conflict & Security
'war', 'conflict', 'military', 'missile', 'attack', 'invasion',
'sanctions', 'embargo', 'blockade',
# Political
'election', 'impeachment', 'coup', 'protest', 'unrest',
'geopolitical', 'tension', 'crisis', 'dispute',
# Trade & Relations
'trade war', 'tariff', 'trade deal', 'summit', 'treaty',
'China', 'Russia', 'Taiwan', 'Middle East', 'Ukraine'
]
MARKET_KEYWORDS = [
# Indices & General
'S&P', 'Nasdaq', 'Dow', 'Russell', 'VIX', 'volatility',
'rally', 'sell-off', 'correction', 'crash', 'bull', 'bear',
# Corporate Events
'earnings', 'EPS', 'revenue', 'guidance', 'beat', 'miss',
'IPO', 'merger', 'acquisition', 'M&A', 'buyback', 'dividend',
# Sectors & Assets
'tech stocks', 'banks', 'energy', 'commodities', 'crypto',
'Bitcoin', 'oil', 'gold', 'dollar', 'DXY'
]
# High-impact market-moving keywords
BREAKING_KEYWORDS = [
'BREAKING', 'ALERT', 'URGENT', 'just in', 'developing',
'Fed', 'Powell', 'emergency', 'unexpected', 'surprise'
]
def __init__(self):
self.news_cache = []
self.last_fetch = None
self.cache_ttl = 180 # 3 minutes for low latency
@st.cache_data(ttl=180)
def scrape_twitter_news(_self, max_tweets: int = 100) -> List[Dict]:
"""
Scrape latest financial news with caching
max_tweets: Total tweets to fetch (distributed across sources)
"""
if not SNSCRAPE_AVAILABLE:
logger.info("snscrape not available - using mock data")
return _self._get_mock_news()
all_tweets = []
tweets_per_source = max(5, max_tweets // len(_self.SOURCES))
failed_sources = 0
for source_name, source_info in _self.SOURCES.items():
try:
handle = source_info['handle'].replace('@', '')
# Optimized query: exclude replies and retweets for signal clarity
query = f"from:{handle} -filter:replies -filter:retweets"
scraped = 0
for tweet in sntwitter.TwitterSearchScraper(query).get_items():
if scraped >= tweets_per_source:
break
# Skip old tweets (>24h)
if (datetime.now() - tweet.date).days > 1:
continue
# Categorize and analyze
category = _self._categorize_tweet(tweet.content, source_info['specialization'])
sentiment = _self._analyze_sentiment(tweet.content)
impact = _self._assess_impact(tweet, source_info['weight'])
is_breaking = _self._detect_breaking_news(tweet.content)
all_tweets.append({
'id': tweet.id,
'title': tweet.content,
'summary': _self._extract_summary(tweet.content),
'source': source_name.capitalize(),
'category': category,
'timestamp': tweet.date,
'sentiment': sentiment,
'impact': impact,
'url': tweet.url,
'likes': tweet.likeCount or 0,
'retweets': tweet.retweetCount or 0,
'is_breaking': is_breaking,
'source_weight': source_info['weight']
})
scraped += 1
except Exception as e:
failed_sources += 1
error_msg = str(e).lower()
if 'blocked' in error_msg or '404' in error_msg:
logger.warning(f"Twitter/X API blocked access for {source_name}")
else:
logger.error(f"Error scraping {source_name}: {e}")
continue
# If Twitter/X blocked all sources, fall back to mock data
if failed_sources >= len(_self.SOURCES) or len(all_tweets) == 0:
logger.warning("Twitter/X API unavailable - falling back to mock data for demonstration")
return _self._get_mock_news()
# Sort by impact and timestamp
all_tweets.sort(
key=lambda x: (x['is_breaking'], x['impact'] == 'high', x['timestamp']),
reverse=True
)
return all_tweets
def _categorize_tweet(self, text: str, source_specialization: List[str]) -> str:
"""Advanced categorization with source specialization"""
text_lower = text.lower()
# Calculate weighted scores
macro_score = sum(2 if kw.lower() in text_lower else 0
for kw in self.MACRO_KEYWORDS)
geo_score = sum(2 if kw.lower() in text_lower else 0
for kw in self.GEO_KEYWORDS)
market_score = sum(2 if kw.lower() in text_lower else 0
for kw in self.MARKET_KEYWORDS)
# Boost scores based on source specialization
if 'macro' in source_specialization:
macro_score *= 1.5
if 'geopolitical' in source_specialization:
geo_score *= 1.5
if 'markets' in source_specialization:
market_score *= 1.5
scores = {
'macro': macro_score,
'geopolitical': geo_score,
'markets': market_score
}
return max(scores, key=scores.get) if max(scores.values()) > 0 else 'general'
def _analyze_sentiment(self, text: str) -> str:
"""Professional sentiment analysis for trading"""
positive_words = [
'surge', 'rally', 'soar', 'jump', 'gain', 'rise', 'climb',
'growth', 'positive', 'strong', 'robust', 'beat', 'exceed',
'outperform', 'record high', 'breakthrough', 'optimistic'
]
negative_words = [
'plunge', 'crash', 'tumble', 'fall', 'drop', 'decline', 'slump',
'loss', 'weak', 'fragile', 'crisis', 'concern', 'risk', 'fear',
'miss', 'disappoint', 'warning', 'downgrade', 'recession'
]
text_lower = text.lower()
pos_count = sum(2 if word in text_lower else 0 for word in positive_words)
neg_count = sum(2 if word in text_lower else 0 for word in negative_words)
# Threshold for clear signal
if pos_count > neg_count + 1:
return 'positive'
elif neg_count > pos_count + 1:
return 'negative'
return 'neutral'
def _assess_impact(self, tweet, source_weight: float) -> str:
"""Assess market impact based on engagement and source credibility"""
engagement = (tweet.likeCount or 0) + (tweet.retweetCount or 0) * 2
weighted_engagement = engagement * source_weight
# Breaking news always high impact
if self._detect_breaking_news(tweet.content):
return 'high'
if weighted_engagement > 1500 or source_weight >= 2.0:
return 'high'
elif weighted_engagement > 300:
return 'medium'
return 'low'
def _detect_breaking_news(self, text: str) -> bool:
"""Detect breaking/urgent news for immediate alerts"""
text_upper = text.upper()
return any(keyword.upper() in text_upper for keyword in self.BREAKING_KEYWORDS)
def _extract_summary(self, text: str, max_length: int = 200) -> str:
"""Extract clean summary for display"""
# Remove URLs
import re
text = re.sub(r'http\S+', '', text)
text = text.strip()
if len(text) <= max_length:
return text
return text[:max_length] + '...'
def _get_mock_news(self) -> List[Dict]:
"""Mock news data when snscrape is unavailable - Showcases all source types"""
return [
# Tier 3: Central Bank - BREAKING
{
'id': 1,
'title': 'BREAKING: Federal Reserve announces emergency rate cut of 50bps - Powell cites economic uncertainty',
'summary': 'BREAKING: Fed emergency rate cut 50bps',
'source': 'Federal Reserve',
'category': 'macro',
'timestamp': datetime.now() - timedelta(minutes=5),
'sentiment': 'negative',
'impact': 'high',
'url': 'https://twitter.com/federalreserve',
'likes': 5000,
'retweets': 2000,
'is_breaking': True,
'source_weight': 2.0
},
# Tier 4: Alpha Account - Fast Alert
{
'id': 2,
'title': '*FIRST SQUAWK: S&P 500 FUTURES DROP 2% AFTER FED ANNOUNCEMENT',
'summary': '*FIRST SQUAWK: S&P 500 futures drop 2%',
'source': 'First Squawk',
'category': 'markets',
'timestamp': datetime.now() - timedelta(minutes=10),
'sentiment': 'negative',
'impact': 'high',
'url': 'https://twitter.com/FirstSquawk',
'likes': 1500,
'retweets': 600,
'is_breaking': False,
'source_weight': 1.1
},
# Tier 1: Bloomberg - Markets
{
'id': 3,
'title': 'Apple reports earnings beat with $123B revenue, raises dividend by 4% - Stock up 3% after hours',
'summary': 'Apple beats earnings, raises dividend 4%',
'source': 'Bloomberg',
'category': 'markets',
'timestamp': datetime.now() - timedelta(minutes=25),
'sentiment': 'positive',
'impact': 'high',
'url': 'https://twitter.com/business',
'likes': 2800,
'retweets': 900,
'is_breaking': False,
'source_weight': 1.5
},
# Tier 3: ECB President
{
'id': 4,
'title': 'ECB President Lagarde: Inflation remains above target, rates to stay higher for longer',
'summary': 'Lagarde: rates to stay higher for longer',
'source': 'Lagarde',
'category': 'macro',
'timestamp': datetime.now() - timedelta(minutes=45),
'sentiment': 'neutral',
'impact': 'high',
'url': 'https://twitter.com/Lagarde',
'likes': 1200,
'retweets': 400,
'is_breaking': False,
'source_weight': 1.9
},
# Tier 2: Geopolitical - BBC
{
'id': 5,
'title': 'Ukraine conflict: New peace talks scheduled as tensions ease in Eastern Europe',
'summary': 'Ukraine: New peace talks scheduled',
'source': 'BBC World',
'category': 'geopolitical',
'timestamp': datetime.now() - timedelta(hours=1),
'sentiment': 'positive',
'impact': 'medium',
'url': 'https://twitter.com/BBCWorld',
'likes': 3500,
'retweets': 1200,
'is_breaking': False,
'source_weight': 1.4
},
# Tier 1: Reuters - Macro
{
'id': 6,
'title': 'US GDP growth revised up to 2.8% in Q4, beating economists expectations of 2.5%',
'summary': 'US GDP growth revised up to 2.8% in Q4',
'source': 'Reuters',
'category': 'macro',
'timestamp': datetime.now() - timedelta(hours=2),
'sentiment': 'positive',
'impact': 'medium',
'url': 'https://twitter.com/Reuters',
'likes': 1800,
'retweets': 600,
'is_breaking': False,
'source_weight': 1.5
},
# Tier 4: Live Squawk
{
'id': 7,
'title': '*LIVE SQUAWK: Oil prices surge 5% on Middle East supply concerns, Brent crude at $92/barrel',
'summary': '*LIVE SQUAWK: Oil surges 5% on supply fears',
'source': 'Live Squawk',
'category': 'markets',
'timestamp': datetime.now() - timedelta(hours=3),
'sentiment': 'neutral',
'impact': 'medium',
'url': 'https://twitter.com/LiveSquawk',
'likes': 900,
'retweets': 350,
'is_breaking': False,
'source_weight': 1.1
},
# Tier 3: IMF
{
'id': 8,
'title': 'IMF upgrades global growth forecast to 3.2% for 2024, warns of recession risks in Europe',
'summary': 'IMF upgrades global growth to 3.2%',
'source': 'IMF',
'category': 'macro',
'timestamp': datetime.now() - timedelta(hours=4),
'sentiment': 'neutral',
'impact': 'medium',
'url': 'https://twitter.com/IMFNews',
'likes': 800,
'retweets': 300,
'is_breaking': False,
'source_weight': 1.7
},
# Tier 2: Politico - Geopolitical
{
'id': 9,
'title': 'US-China trade talks resume in Washington, focus on technology transfer and tariffs',
'summary': 'US-China trade talks resume',
'source': 'Politico',
'category': 'geopolitical',
'timestamp': datetime.now() - timedelta(hours=5),
'sentiment': 'neutral',
'impact': 'low',
'url': 'https://twitter.com/politico',
'likes': 600,
'retweets': 200,
'is_breaking': False,
'source_weight': 1.2
},
# Tier 1: FT - Markets
{
'id': 10,
'title': 'Bank of America cuts recession probability to 20%, cites resilient consumer spending',
'summary': 'BofA cuts recession probability to 20%',
'source': 'FT',
'category': 'markets',
'timestamp': datetime.now() - timedelta(hours=6),
'sentiment': 'positive',
'impact': 'low',
'url': 'https://twitter.com/FT',
'likes': 700,
'retweets': 250,
'is_breaking': False,
'source_weight': 1.4
}
]
def get_news(self, category: str = 'all', sentiment: str = 'all',
impact: str = 'all', refresh: bool = False) -> pd.DataFrame:
"""
Get filtered news with intelligent caching
Args:
category: 'all', 'macro', 'geopolitical', 'markets'
sentiment: 'all', 'positive', 'negative', 'neutral'
impact: 'all', 'high', 'medium', 'low'
refresh: Force refresh cache
"""
# Check cache freshness
if refresh or not self.last_fetch or \
(datetime.now() - self.last_fetch).seconds > self.cache_ttl:
self.news_cache = self.scrape_twitter_news(max_tweets=100)
self.last_fetch = datetime.now()
news = self.news_cache.copy()
# Apply filters
if category != 'all':
news = [n for n in news if n['category'] == category]
if sentiment != 'all':
news = [n for n in news if n['sentiment'] == sentiment]
if impact != 'all':
news = [n for n in news if n['impact'] == impact]
df = pd.DataFrame(news)
if not df.empty:
df['timestamp'] = pd.to_datetime(df['timestamp'])
return df
def get_breaking_news(self) -> pd.DataFrame:
"""Get only breaking/high-impact news for alerts"""
df = self.get_news()
if not df.empty:
return df[df['is_breaking'] == True].head(10)
return df
def get_statistics(self) -> Dict:
"""Get news feed statistics"""
if not self.news_cache:
return {
'total': 0,
'high_impact': 0,
'breaking': 0,
'last_update': 'Never'
}
return {
'total': len(self.news_cache),
'high_impact': len([n for n in self.news_cache if n['impact'] == 'high']),
'breaking': len([n for n in self.news_cache if n['is_breaking']]),
'last_update': self.last_fetch.strftime('%H:%M:%S') if self.last_fetch else 'Never',
'by_category': {
'macro': len([n for n in self.news_cache if n['category'] == 'macro']),
'geopolitical': len([n for n in self.news_cache if n['category'] == 'geopolitical']),
'markets': len([n for n in self.news_cache if n['category'] == 'markets'])
}
}