| """News Intelligence v1.0 β Real-Time News Sentiment + Event Detection |
| FinBERT-based sentiment scoring with event classification. |
| Falls back to regex-based analysis if FinBERT unavailable. |
| """ |
| import re, os, json, requests |
| from datetime import datetime, timedelta |
| from typing import List, Dict, Optional, Tuple |
| import numpy as np |
|
|
| |
| EVENT_PATTERNS = { |
| 'earnings': ['earnings', 'quarterly', 'revenue', 'eps', 'profit', 'q[1-4]', 'fiscal'], |
| 'fed': ['federal reserve', 'fed', 'fomc', 'interest rate', 'rate hike', 'rate cut', 'powell'], |
| 'cpi': ['cpi', 'inflation', 'consumer price', 'core pce'], |
| 'jobs': ['jobs report', 'unemployment', 'nfp', 'nonfarm payroll', 'labor'], |
| 'lawsuit': ['lawsuit', 'sec', 'doj', 'investigation', 'antitrust', 'fine', 'settlement'], |
| 'merger': ['merger', 'acquisition', 'acquire', 'buyout', 'merging', 'takeover'], |
| 'dividend': ['dividend', 'buyback', 'share repurchase', 'dividend yield'], |
| 'split': ['stock split', 'split', 'reverse split'], |
| 'upgrade': ['upgrade', 'upgraded', 'overweight', 'buy rating', 'price target raised'], |
| 'downgrade': ['downgrade', 'downgraded', 'underweight', 'sell rating', 'price target cut'], |
| 'product': ['product launch', 'new product', 'iphone', 'ai model', 'release date'], |
| 'supply_chain': ['supply chain', 'shortage', 'inventory', 'chip shortage', 'factory'], |
| 'macro': ['gdp', 'recession', 'economic growth', 'fiscal policy', 'stimulus'], |
| 'geopolitical': ['war', 'sanctions', 'tension', 'china', 'trade war', 'tariff'], |
| 'analyst': ['analyst', 'wall street', 'target price', 'consensus'], |
| } |
|
|
| BULLISH_WORDS = [ |
| 'beat', 'strong', 'growth', 'surge', 'rally', 'bullish', 'outperform', |
| 'exceed', 'record', 'milestone', 'breakthrough', 'partnership', 'launch', |
| 'innovation', 'momentum', 'premium', 'dominant', 'leader', 'expansion' |
| ] |
|
|
| BEARISH_WORDS = [ |
| 'miss', 'weak', 'decline', 'drop', 'crash', 'bearish', 'underperform', |
| 'loss', 'concern', 'warning', 'risk', 'lawsuit', 'investigation', |
| 'fraud', 'default', 'bankruptcy', 'layoff', 'cut', 'slash', 'downturn', |
| 'recession', 'contagion', 'crisis', 'collapse' |
| ] |
|
|
|
|
| class NewsIntelligence: |
| """Multi-source news sentiment with FinBERT + rule-based fallback.""" |
|
|
| def __init__(self, finbert_available: bool = None, cache_dir: str = ".cache/news"): |
| self.cache_dir = cache_dir |
| os.makedirs(cache_dir, exist_ok=True) |
| self._finbert = None |
| self._tokenizer = None |
| self._sentiment_cache = {} |
|
|
| if finbert_available is None: |
| try: |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification |
| self._tokenizer = AutoTokenizer.from_pretrained("ProsusAI/finbert") |
| self._finbert = AutoModelForSequenceClassification.from_pretrained("ProsusAI/finbert") |
| self._finbert.eval() |
| finbert_available = True |
| except Exception: |
| finbert_available = False |
| self.use_finbert = finbert_available |
|
|
| def classify_event(self, headline: str, summary: str = "") -> Tuple[str, float]: |
| """Classify article into event type and severity (0-1).""" |
| text = (headline + " " + summary).lower() |
| scores = {} |
| for event_type, patterns in EVENT_PATTERNS.items(): |
| score = 0 |
| for pat in patterns: |
| count = len(re.findall(pat, text)) |
| score += count |
| if score > 0: |
| scores[event_type] = score |
|
|
| if not scores: |
| return 'general', 0.1 |
|
|
| best = max(scores, key=scores.get) |
| return best, min(1.0, scores[best] * 0.5) |
|
|
| def rule_sentiment(self, headline: str, summary: str = "") -> Dict: |
| """Rule-based sentiment as fallback when FinBERT unavailable.""" |
| text = (headline + " " + summary).lower() |
| bull = sum(text.count(w) for w in BULLISH_WORDS) |
| bear = sum(text.count(w) for w in BEARISH_WORDS) |
| total = bull + bear + 1e-10 |
| |
| sentiment = 50 + (bull - bear) / total * 50 |
| confidence = min(1.0, total * 0.1) |
| return { |
| 'score': max(0, min(100, sentiment)), |
| 'confidence': confidence, |
| 'method': 'rule' |
| } |
|
|
| def finbert_sentiment(self, headline: str, summary: str = "") -> Dict: |
| """FinBERT inference. Returns score 0-100.""" |
| if not self.use_finbert: |
| return self.rule_sentiment(headline, summary) |
|
|
| import torch |
| text = headline |
| if summary: |
| text += ". " + summary[:500] |
|
|
| inputs = self._tokenizer(text, return_tensors="pt", truncation=True, max_length=512) |
| with torch.no_grad(): |
| outputs = self._finbert(**inputs) |
| probs = torch.softmax(outputs.logits, dim=1)[0].numpy() |
|
|
| |
| neg, neu, pos = probs |
| |
| score = 50 + (pos - neg) * 50 |
| confidence = 1 - neu |
|
|
| return { |
| 'score': max(0, min(100, score)), |
| 'confidence': float(confidence), |
| 'probs': {'negative': float(neg), 'neutral': float(neu), 'positive': float(pos)}, |
| 'method': 'finbert' |
| } |
|
|
| def analyze_article(self, headline: str, summary: str = "", |
| timestamp: str = None) -> Dict: |
| """Full article analysis: sentiment + event classification.""" |
| event_type, event_severity = self.classify_event(headline, summary) |
| sentiment = self.finbert_sentiment(headline, summary) |
|
|
| |
| event_sentiment_override = { |
| 'earnings': 0, |
| 'fed': -10, |
| 'lawsuit': -25, |
| 'upgrade': +20, |
| 'downgrade': -20, |
| 'merger': +15, |
| 'dividend': +10, |
| 'product': +15, |
| } |
| adj_score = sentiment['score'] |
| if event_type in event_sentiment_override: |
| adj_score += event_sentiment_override[event_type] |
| sentiment['adjusted_score'] = max(0, min(100, adj_score)) |
| else: |
| sentiment['adjusted_score'] = adj_score |
|
|
| return { |
| 'headline': headline, |
| 'summary': summary[:200] if summary else "", |
| 'timestamp': timestamp or datetime.now().isoformat(), |
| 'sentiment': sentiment, |
| 'event': { |
| 'type': event_type, |
| 'severity': event_severity, |
| } |
| } |
|
|
| def fetch_newsapi(self, query: str, api_key: str = None, days: int = 7) -> List[Dict]: |
| """Fetch news from NewsAPI. Returns list of article analyses.""" |
| if not api_key: |
| api_key = os.environ.get('NEWSAPI_KEY') |
| if not api_key: |
| return self._mock_news(query) |
|
|
| from_date = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') |
| url = f"https://newsapi.org/v2/everything?q={query}&from={from_date}&sortBy=publishedAt&language=en&apiKey={api_key}" |
|
|
| try: |
| r = requests.get(url, timeout=15) |
| r.raise_for_status() |
| articles = r.json().get('articles', []) |
| results = [] |
| for art in articles[:10]: |
| analysis = self.analyze_article( |
| art.get('title', ''), |
| art.get('description', ''), |
| art.get('publishedAt') |
| ) |
| results.append(analysis) |
| return results |
| except Exception as e: |
| print(f"NewsAPI error: {e}") |
| return self._mock_news(query) |
|
|
| def fetch_yfinance_news(self, ticker: str) -> List[Dict]: |
| """Fetch news from yfinance.""" |
| try: |
| import yfinance as yf |
| t = yf.Ticker(ticker) |
| news = t.news or [] |
| results = [] |
| for item in news[:10]: |
| title = item.get('title', '') or item.get('content', {}).get('title', '') |
| summary = item.get('summary', '') or item.get('content', {}).get('summary', '') |
| analysis = self.analyze_article(title, summary) |
| results.append(analysis) |
| return results |
| except Exception as e: |
| print(f"yfinance news error: {e}") |
| return self._mock_news(ticker) |
|
|
| def aggregate_sentiment(self, articles: List[Dict]) -> Dict: |
| """Aggregate sentiment across articles with recency weighting.""" |
| if not articles: |
| return {'score': 50, 'confidence': 0, 'volume': 0, 'trend': 'neutral'} |
|
|
| scores = [] |
| for art in articles: |
| adj = art['sentiment'].get('adjusted_score', art['sentiment']['score']) |
| conf = art['sentiment'].get('confidence', 0.5) |
| scores.append((adj, conf)) |
|
|
| if not scores: |
| return {'score': 50, 'confidence': 0, 'volume': 0, 'trend': 'neutral'} |
|
|
| |
| total_weight = sum(conf for _, conf in scores) + 1e-10 |
| weighted_score = sum(s * c for s, c in scores) / total_weight |
|
|
| |
| bullish = sum(1 for s, _ in scores if s > 55) |
| bearish = sum(1 for s, _ in scores if s < 45) |
| neutral = sum(1 for s, _ in scores if 45 <= s <= 55) |
|
|
| volume = len(scores) |
| if bullish > bearish * 2: |
| trend = 'strong_bullish' |
| elif bullish > bearish: |
| trend = 'bullish' |
| elif bearish > bullish * 2: |
| trend = 'strong_bearish' |
| elif bearish > bullish: |
| trend = 'bearish' |
| else: |
| trend = 'mixed' |
|
|
| |
| events = [a['event']['type'] for a in articles] |
| event_counts = {} |
| for e in events: |
| event_counts[e] = event_counts.get(e, 0) + 1 |
| dominant_event = max(event_counts, key=event_counts.get) if event_counts else 'general' |
|
|
| return { |
| 'score': round(weighted_score, 1), |
| 'confidence': round(total_weight / volume, 2), |
| 'volume': volume, |
| 'trend': trend, |
| 'bullish_count': bullish, |
| 'bearish_count': bearish, |
| 'neutral_count': neutral, |
| 'dominant_event': dominant_event, |
| 'event_counts': event_counts, |
| } |
|
|
| def _mock_news(self, query: str) -> List[Dict]: |
| """Mock news for testing without API keys.""" |
| mock = [ |
| f"{query} beats earnings expectations, revenue surges 15%", |
| f"{query} announces new AI product partnership", |
| f"Analysts upgrade {query} to overweight, target raised to $500", |
| f"{query} faces supply chain headwinds in Q3", |
| f"{query} maintains guidance despite macro uncertainty", |
| ] |
| return [self.analyze_article(h) for h in mock] |
|
|
| def get_full_analysis(self, ticker: str, market: str = 'US', period_days: int = 7) -> Dict: |
| """Full news intelligence pipeline for a ticker.""" |
| |
| articles = self.fetch_yfinance_news(ticker) |
|
|
| |
| if len(articles) < 3: |
| api_articles = self.fetch_newsapi(ticker, days=period_days) |
| articles.extend(api_articles) |
|
|
| |
| seen = set() |
| unique = [] |
| for a in articles: |
| key = a['headline'][:50].lower() |
| if key not in seen: |
| seen.add(key) |
| unique.append(a) |
|
|
| sentiment = self.aggregate_sentiment(unique) |
| sentiment['articles'] = unique[:5] |
| sentiment['ticker'] = ticker |
| sentiment['market'] = market |
| sentiment['timestamp'] = datetime.now().isoformat() |
| return sentiment |
|
|
|
|
| if __name__ == '__main__': |
| ni = NewsIntelligence() |
| result = ni.get_full_analysis('AAPL') |
| print(f"Sentiment Score: {result['score']}/100") |
| print(f"Trend: {result['trend']}") |
| print(f"Dominant Event: {result['dominant_event']}") |
| print(f"Article Count: {result['volume']}") |
| for art in result['articles'][:3]: |
| print(f"\n π° {art['headline']}") |
| print(f" Score: {art['sentiment']['adjusted_score']:.1f} | Event: {art['event']['type']}") |
|
|