| | """ |
| | Breaking News Scoring System |
| | Identifies highest-impact financial news using multi-factor weighted scoring |
| | """ |
| |
|
| | import re |
| | from datetime import datetime, timedelta |
| | from typing import Dict, List |
| | import logging |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | class BreakingNewsScorer: |
| | """ |
| | Sophisticated scoring system for breaking financial news |
| | Uses weighted factors to identify market-moving events |
| | """ |
| |
|
| | |
| | CRITICAL_KEYWORDS = [ |
| | |
| | 'rate hike', 'rate cut', 'interest rate', 'fed raises', 'fed cuts', |
| | 'fomc decision', 'monetary policy', 'quantitative easing', 'qe', |
| | 'emergency meeting', 'powell', 'lagarde', 'yellen', |
| |
|
| | |
| | 'market crash', 'flash crash', 'circuit breaker', 'trading halt', |
| | 'all-time high', 'all time high', 'record high', 'record low', |
| | 'biggest drop', 'biggest gain', 'historic', 'unprecedented', |
| |
|
| | |
| | 'gdp', 'jobs report', 'unemployment', 'inflation', |
| | 'cpi', 'ppi', 'nonfarm payroll', 'nfp', |
| |
|
| | |
| | 'earnings beat', 'earnings miss', 'profit warning', |
| | 'bankruptcy', 'chapter 11', 'delisted', |
| | 'merger', 'acquisition', 'takeover', 'buyout', |
| |
|
| | |
| | 'war', 'invasion', 'sanctions', 'trade war', |
| | 'embargo', 'default', 'debt ceiling', 'shutdown', |
| | 'impeachment', 'coup', 'terrorist attack' |
| | ] |
| |
|
| | |
| | HIGH_IMPACT_KEYWORDS = [ |
| | |
| | 'surge', 'plunge', 'soar', 'tumble', 'rally', 'selloff', |
| | 'volatility', 'whipsaw', 'correction', 'bear market', 'bull market', |
| |
|
| | |
| | 'retail sales', 'housing starts', 'consumer confidence', |
| | 'manufacturing index', 'pmi', 'trade deficit', |
| |
|
| | |
| | 'revenue beat', 'guidance', 'dividend', 'stock split', |
| | 'ipo', 'listing', 'secondary offering', |
| |
|
| | |
| | 'bitcoin', 'crypto crash', 'hack', 'breach', |
| | 'antitrust', 'regulation', 'sec investigation', |
| |
|
| | |
| | 'oil', 'gold', 'crude', 'opec', 'energy crisis', |
| | 'supply chain', 'shortage', 'surplus' |
| | ] |
| |
|
| | |
| | MEDIUM_IMPACT_KEYWORDS = [ |
| | 'analyst', 'upgrade', 'downgrade', 'target price', |
| | 'forecast', 'outlook', 'projection', 'estimate', |
| | 'conference call', 'ceo', 'cfo', 'executive', |
| | 'lawsuit', 'settlement', 'fine', 'penalty', |
| | 'product launch', 'partnership', 'deal', 'contract' |
| | ] |
| |
|
| | |
| | SOURCE_WEIGHTS = { |
| | |
| | 'walter_bloomberg': 2.0, |
| | 'fxhedge': 2.0, |
| | 'deitaone': 2.0, |
| | 'firstsquawk': 1.9, |
| | 'livesquawk': 1.9, |
| |
|
| | |
| | 'reuters': 1.8, |
| | 'bloomberg': 1.8, |
| | 'ft': 1.7, |
| | 'wsj': 1.7, |
| |
|
| | |
| | 'cnbc': 1.5, |
| | 'bbc': 1.5, |
| | 'marketwatch': 1.5, |
| |
|
| | |
| | 'zerohedge': 1.2, |
| | 'wallstreetbets': 1.2, |
| | 'reddit': 1.2, |
| |
|
| | |
| | 'default': 1.0 |
| | } |
| |
|
| | |
| | MAJOR_TICKERS = [ |
| | 'SPY', 'QQQ', 'DIA', 'IWM', |
| | 'AAPL', 'MSFT', 'GOOGL', 'AMZN', 'NVDA', 'TSLA', 'META', |
| | 'JPM', 'BAC', 'GS', 'MS', 'WFC', |
| | 'XOM', 'CVX', 'COP', |
| | 'BTC', 'ETH', 'BTCUSD', 'ETHUSD' |
| | ] |
| |
|
| | def __init__(self): |
| | """Initialize the breaking news scorer""" |
| | logger.info("BreakingNewsScorer initialized") |
| |
|
| | def calculate_impact_score(self, news_item: Dict) -> float: |
| | """ |
| | Calculate comprehensive impact score for a news item |
| | |
| | Args: |
| | news_item: Dictionary containing news metadata |
| | |
| | Returns: |
| | Impact score (0-100, higher = more impactful) |
| | """ |
| | score = 0.0 |
| |
|
| | |
| | title = news_item.get('title', '').lower() |
| | summary = news_item.get('summary', '').lower() |
| | source = news_item.get('source', '').lower() |
| | timestamp = news_item.get('timestamp', datetime.now()) |
| | sentiment = news_item.get('sentiment', 'neutral') |
| | impact_level = news_item.get('impact', 'low') |
| | category = news_item.get('category', 'markets') |
| |
|
| | |
| | text = f"{title} {summary}" |
| |
|
| | |
| | keyword_score = self._score_keywords(text) |
| | score += keyword_score |
| |
|
| | |
| | recency_score = self._score_recency(timestamp) |
| | score += recency_score |
| |
|
| | |
| | source_score = self._score_source(source) |
| | score += source_score |
| |
|
| | |
| | engagement_score = self._score_engagement(news_item) |
| | score += engagement_score |
| |
|
| | |
| | sentiment_score = self._score_sentiment(sentiment) |
| | score += sentiment_score |
| |
|
| | |
| | category_score = self._score_category(category) |
| | score += category_score |
| |
|
| | |
| | ticker_score = self._score_tickers(text) |
| | score += ticker_score |
| |
|
| | |
| | urgency_score = self._score_urgency(text) |
| | score += urgency_score |
| |
|
| | |
| | if impact_level == 'high': |
| | score *= 1.2 |
| | elif impact_level == 'medium': |
| | score *= 1.1 |
| |
|
| | |
| | score = min(score, 100.0) |
| |
|
| | logger.debug(f"News '{title[:50]}...' scored: {score:.2f}") |
| |
|
| | return score |
| |
|
| | def _score_keywords(self, text: str) -> float: |
| | """Score based on keyword presence and frequency""" |
| | score = 0.0 |
| |
|
| | |
| | critical_matches = sum(1 for kw in self.CRITICAL_KEYWORDS if kw in text) |
| | score += min(critical_matches * 3.0, 18.0) |
| |
|
| | |
| | high_matches = sum(1 for kw in self.HIGH_IMPACT_KEYWORDS if kw in text) |
| | score += min(high_matches * 2.0, 8.0) |
| |
|
| | |
| | medium_matches = sum(1 for kw in self.MEDIUM_IMPACT_KEYWORDS if kw in text) |
| | score += min(medium_matches * 1.0, 4.0) |
| |
|
| | return min(score, 30.0) |
| |
|
| | def _score_recency(self, timestamp: datetime) -> float: |
| | """Score based on how recent the news is""" |
| | try: |
| | if isinstance(timestamp, str): |
| | timestamp = datetime.fromisoformat(timestamp.replace('Z', '+00:00')) |
| |
|
| | age_seconds = (datetime.now() - timestamp.replace(tzinfo=None)).total_seconds() |
| | age_minutes = age_seconds / 60 |
| |
|
| | |
| | if age_minutes < 5: |
| | return 20.0 |
| | elif age_minutes < 15: |
| | return 18.0 |
| | elif age_minutes < 30: |
| | return 15.0 |
| | elif age_minutes < 60: |
| | return 10.0 |
| | elif age_minutes < 180: |
| | return 5.0 |
| | else: |
| | return 1.0 |
| | except: |
| | return 5.0 |
| |
|
| | def _score_source(self, source: str) -> float: |
| | """Score based on source credibility""" |
| | source = source.lower().replace(' ', '_').replace('/', '').replace('@', '') |
| |
|
| | |
| | for source_key, weight in self.SOURCE_WEIGHTS.items(): |
| | if source_key in source: |
| | return weight * 10.0 |
| |
|
| | return self.SOURCE_WEIGHTS['default'] * 10.0 |
| |
|
| | def _score_engagement(self, news_item: Dict) -> float: |
| | """Score based on social engagement metrics""" |
| | score = 0.0 |
| | has_engagement = False |
| |
|
| | |
| | likes = news_item.get('likes', 0) |
| | if likes > 0: |
| | has_engagement = True |
| | score += min(likes / 1000, 5.0) |
| |
|
| | retweets = news_item.get('retweets', 0) |
| | if retweets > 0: |
| | has_engagement = True |
| | score += min(retweets / 500, 5.0) |
| |
|
| | |
| | reddit_score = news_item.get('reddit_score', 0) |
| | if reddit_score > 0: |
| | has_engagement = True |
| | score += min(reddit_score / 1000, 5.0) |
| |
|
| | comments = news_item.get('reddit_comments', 0) |
| | if comments > 0: |
| | has_engagement = True |
| | score += min(comments / 200, 5.0) |
| |
|
| | |
| | if not has_engagement: |
| | return 5.0 |
| |
|
| | return min(score, 15.0) |
| |
|
| | def _score_sentiment(self, sentiment: str) -> float: |
| | """Score based on sentiment extremity (extreme = more impactful)""" |
| | if sentiment == 'positive': |
| | return 8.0 |
| | elif sentiment == 'negative': |
| | return 10.0 |
| | else: |
| | return 3.0 |
| |
|
| | def _score_category(self, category: str) -> float: |
| | """Score based on category relevance""" |
| | if category == 'macro': |
| | return 5.0 |
| | elif category == 'markets': |
| | return 4.0 |
| | elif category == 'geopolitical': |
| | return 3.0 |
| | else: |
| | return 2.0 |
| |
|
| | def _score_tickers(self, text: str) -> float: |
| | """Bonus score for mentioning major market-moving tickers""" |
| | text_upper = text.upper() |
| |
|
| | |
| | ticker_mentions = sum(1 for ticker in self.MAJOR_TICKERS if ticker in text_upper) |
| |
|
| | |
| | return min(ticker_mentions * 2.0, 10.0) |
| |
|
| | def _score_urgency(self, text: str) -> float: |
| | """Bonus score for urgency indicators""" |
| | urgency_patterns = [ |
| | r'\bbreaking\b', r'\balert\b', r'\burgent\b', r'\bjust in\b', |
| | r'\bemergency\b', r'\bimmediate\b', r'\bnow\b', r'\btoday\b', |
| | r'‼️', r'🚨', r'⚠️', r'🔴', r'❗' |
| | ] |
| |
|
| | score = 0.0 |
| | for pattern in urgency_patterns: |
| | if re.search(pattern, text, re.IGNORECASE): |
| | score += 2.0 |
| |
|
| | return min(score, 10.0) |
| |
|
| | def get_breaking_news(self, news_items: List[Dict], top_n: int = 1) -> List[Dict]: |
| | """ |
| | Identify top breaking news from a list |
| | |
| | Args: |
| | news_items: List of news item dictionaries |
| | top_n: Number of top items to return |
| | |
| | Returns: |
| | List of top breaking news items with scores |
| | """ |
| | if not news_items: |
| | return [] |
| |
|
| | |
| | scored_items = [] |
| | for item in news_items: |
| | score = self.calculate_impact_score(item) |
| | scored_items.append({ |
| | **item, |
| | 'breaking_score': score |
| | }) |
| |
|
| | |
| | scored_items.sort(key=lambda x: x['breaking_score'], reverse=True) |
| |
|
| | |
| | logger.info(f"Top {top_n} breaking news:") |
| | for i, item in enumerate(scored_items[:top_n], 1): |
| | logger.info(f" {i}. [{item['breaking_score']:.1f}] {item['title'][:60]}...") |
| |
|
| | return scored_items[:top_n] |
| |
|
| | def get_breaking_threshold(self) -> float: |
| | """Get minimum score threshold for breaking news display""" |
| | return 40.0 |
| |
|
| |
|
| | |
| | _scorer_instance = None |
| |
|
| | def get_breaking_news_scorer() -> BreakingNewsScorer: |
| | """Get singleton instance of BreakingNewsScorer""" |
| | global _scorer_instance |
| | if _scorer_instance is None: |
| | _scorer_instance = BreakingNewsScorer() |
| | return _scorer_instance |
| |
|