alphaforge-quant-system / news_intelligence.py
Premchan369's picture
Add news intelligence with FinBERT sentiment + event detection
c5a2a63 verified
"""News Intelligence v1.0 β€” Real-Time News Sentiment + Event Detection
FinBERT-based sentiment scoring with event classification.
Falls back to regex-based analysis if FinBERT unavailable.
"""
import re, os, json, requests
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Tuple
import numpy as np
# ── Event detection keywords ─────────────────────────────────
EVENT_PATTERNS = {
'earnings': ['earnings', 'quarterly', 'revenue', 'eps', 'profit', 'q[1-4]', 'fiscal'],
'fed': ['federal reserve', 'fed', 'fomc', 'interest rate', 'rate hike', 'rate cut', 'powell'],
'cpi': ['cpi', 'inflation', 'consumer price', 'core pce'],
'jobs': ['jobs report', 'unemployment', 'nfp', 'nonfarm payroll', 'labor'],
'lawsuit': ['lawsuit', 'sec', 'doj', 'investigation', 'antitrust', 'fine', 'settlement'],
'merger': ['merger', 'acquisition', 'acquire', 'buyout', 'merging', 'takeover'],
'dividend': ['dividend', 'buyback', 'share repurchase', 'dividend yield'],
'split': ['stock split', 'split', 'reverse split'],
'upgrade': ['upgrade', 'upgraded', 'overweight', 'buy rating', 'price target raised'],
'downgrade': ['downgrade', 'downgraded', 'underweight', 'sell rating', 'price target cut'],
'product': ['product launch', 'new product', 'iphone', 'ai model', 'release date'],
'supply_chain': ['supply chain', 'shortage', 'inventory', 'chip shortage', 'factory'],
'macro': ['gdp', 'recession', 'economic growth', 'fiscal policy', 'stimulus'],
'geopolitical': ['war', 'sanctions', 'tension', 'china', 'trade war', 'tariff'],
'analyst': ['analyst', 'wall street', 'target price', 'consensus'],
}
BULLISH_WORDS = [
'beat', 'strong', 'growth', 'surge', 'rally', 'bullish', 'outperform',
'exceed', 'record', 'milestone', 'breakthrough', 'partnership', 'launch',
'innovation', 'momentum', 'premium', 'dominant', 'leader', 'expansion'
]
BEARISH_WORDS = [
'miss', 'weak', 'decline', 'drop', 'crash', 'bearish', 'underperform',
'loss', 'concern', 'warning', 'risk', 'lawsuit', 'investigation',
'fraud', 'default', 'bankruptcy', 'layoff', 'cut', 'slash', 'downturn',
'recession', 'contagion', 'crisis', 'collapse'
]
class NewsIntelligence:
"""Multi-source news sentiment with FinBERT + rule-based fallback."""
def __init__(self, finbert_available: bool = None, cache_dir: str = ".cache/news"):
self.cache_dir = cache_dir
os.makedirs(cache_dir, exist_ok=True)
self._finbert = None
self._tokenizer = None
self._sentiment_cache = {} # ticker -> {date: score}
if finbert_available is None:
try:
from transformers import AutoTokenizer, AutoModelForSequenceClassification
self._tokenizer = AutoTokenizer.from_pretrained("ProsusAI/finbert")
self._finbert = AutoModelForSequenceClassification.from_pretrained("ProsusAI/finbert")
self._finbert.eval()
finbert_available = True
except Exception:
finbert_available = False
self.use_finbert = finbert_available
def classify_event(self, headline: str, summary: str = "") -> Tuple[str, float]:
"""Classify article into event type and severity (0-1)."""
text = (headline + " " + summary).lower()
scores = {}
for event_type, patterns in EVENT_PATTERNS.items():
score = 0
for pat in patterns:
count = len(re.findall(pat, text))
score += count
if score > 0:
scores[event_type] = score
if not scores:
return 'general', 0.1
best = max(scores, key=scores.get)
return best, min(1.0, scores[best] * 0.5)
def rule_sentiment(self, headline: str, summary: str = "") -> Dict:
"""Rule-based sentiment as fallback when FinBERT unavailable."""
text = (headline + " " + summary).lower()
bull = sum(text.count(w) for w in BULLISH_WORDS)
bear = sum(text.count(w) for w in BEARISH_WORDS)
total = bull + bear + 1e-10
# Map to 0-100 scale
sentiment = 50 + (bull - bear) / total * 50
confidence = min(1.0, total * 0.1)
return {
'score': max(0, min(100, sentiment)),
'confidence': confidence,
'method': 'rule'
}
def finbert_sentiment(self, headline: str, summary: str = "") -> Dict:
"""FinBERT inference. Returns score 0-100."""
if not self.use_finbert:
return self.rule_sentiment(headline, summary)
import torch
text = headline
if summary:
text += ". " + summary[:500]
inputs = self._tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
with torch.no_grad():
outputs = self._finbert(**inputs)
probs = torch.softmax(outputs.logits, dim=1)[0].numpy()
# FinBERT: [negative, neutral, positive]
neg, neu, pos = probs
# Map to 0-100
score = 50 + (pos - neg) * 50
confidence = 1 - neu # Higher confidence when less neutral
return {
'score': max(0, min(100, score)),
'confidence': float(confidence),
'probs': {'negative': float(neg), 'neutral': float(neu), 'positive': float(pos)},
'method': 'finbert'
}
def analyze_article(self, headline: str, summary: str = "",
timestamp: str = None) -> Dict:
"""Full article analysis: sentiment + event classification."""
event_type, event_severity = self.classify_event(headline, summary)
sentiment = self.finbert_sentiment(headline, summary)
# Adjust sentiment for event context
event_sentiment_override = {
'earnings': 0,
'fed': -10,
'lawsuit': -25,
'upgrade': +20,
'downgrade': -20,
'merger': +15,
'dividend': +10,
'product': +15,
}
adj_score = sentiment['score']
if event_type in event_sentiment_override:
adj_score += event_sentiment_override[event_type]
sentiment['adjusted_score'] = max(0, min(100, adj_score))
else:
sentiment['adjusted_score'] = adj_score
return {
'headline': headline,
'summary': summary[:200] if summary else "",
'timestamp': timestamp or datetime.now().isoformat(),
'sentiment': sentiment,
'event': {
'type': event_type,
'severity': event_severity,
}
}
def fetch_newsapi(self, query: str, api_key: str = None, days: int = 7) -> List[Dict]:
"""Fetch news from NewsAPI. Returns list of article analyses."""
if not api_key:
api_key = os.environ.get('NEWSAPI_KEY')
if not api_key:
return self._mock_news(query)
from_date = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
url = f"https://newsapi.org/v2/everything?q={query}&from={from_date}&sortBy=publishedAt&language=en&apiKey={api_key}"
try:
r = requests.get(url, timeout=15)
r.raise_for_status()
articles = r.json().get('articles', [])
results = []
for art in articles[:10]:
analysis = self.analyze_article(
art.get('title', ''),
art.get('description', ''),
art.get('publishedAt')
)
results.append(analysis)
return results
except Exception as e:
print(f"NewsAPI error: {e}")
return self._mock_news(query)
def fetch_yfinance_news(self, ticker: str) -> List[Dict]:
"""Fetch news from yfinance."""
try:
import yfinance as yf
t = yf.Ticker(ticker)
news = t.news or []
results = []
for item in news[:10]:
title = item.get('title', '') or item.get('content', {}).get('title', '')
summary = item.get('summary', '') or item.get('content', {}).get('summary', '')
analysis = self.analyze_article(title, summary)
results.append(analysis)
return results
except Exception as e:
print(f"yfinance news error: {e}")
return self._mock_news(ticker)
def aggregate_sentiment(self, articles: List[Dict]) -> Dict:
"""Aggregate sentiment across articles with recency weighting."""
if not articles:
return {'score': 50, 'confidence': 0, 'volume': 0, 'trend': 'neutral'}
scores = []
for art in articles:
adj = art['sentiment'].get('adjusted_score', art['sentiment']['score'])
conf = art['sentiment'].get('confidence', 0.5)
scores.append((adj, conf))
if not scores:
return {'score': 50, 'confidence': 0, 'volume': 0, 'trend': 'neutral'}
# Weighted average by confidence
total_weight = sum(conf for _, conf in scores) + 1e-10
weighted_score = sum(s * c for s, c in scores) / total_weight
# Count by sentiment
bullish = sum(1 for s, _ in scores if s > 55)
bearish = sum(1 for s, _ in scores if s < 45)
neutral = sum(1 for s, _ in scores if 45 <= s <= 55)
volume = len(scores)
if bullish > bearish * 2:
trend = 'strong_bullish'
elif bullish > bearish:
trend = 'bullish'
elif bearish > bullish * 2:
trend = 'strong_bearish'
elif bearish > bullish:
trend = 'bearish'
else:
trend = 'mixed'
# Dominant event
events = [a['event']['type'] for a in articles]
event_counts = {}
for e in events:
event_counts[e] = event_counts.get(e, 0) + 1
dominant_event = max(event_counts, key=event_counts.get) if event_counts else 'general'
return {
'score': round(weighted_score, 1),
'confidence': round(total_weight / volume, 2),
'volume': volume,
'trend': trend,
'bullish_count': bullish,
'bearish_count': bearish,
'neutral_count': neutral,
'dominant_event': dominant_event,
'event_counts': event_counts,
}
def _mock_news(self, query: str) -> List[Dict]:
"""Mock news for testing without API keys."""
mock = [
f"{query} beats earnings expectations, revenue surges 15%",
f"{query} announces new AI product partnership",
f"Analysts upgrade {query} to overweight, target raised to $500",
f"{query} faces supply chain headwinds in Q3",
f"{query} maintains guidance despite macro uncertainty",
]
return [self.analyze_article(h) for h in mock]
def get_full_analysis(self, ticker: str, market: str = 'US', period_days: int = 7) -> Dict:
"""Full news intelligence pipeline for a ticker."""
# Try yfinance first
articles = self.fetch_yfinance_news(ticker)
# If insufficient, try NewsAPI
if len(articles) < 3:
api_articles = self.fetch_newsapi(ticker, days=period_days)
articles.extend(api_articles)
# Deduplicate by headline
seen = set()
unique = []
for a in articles:
key = a['headline'][:50].lower()
if key not in seen:
seen.add(key)
unique.append(a)
sentiment = self.aggregate_sentiment(unique)
sentiment['articles'] = unique[:5] # Top 5
sentiment['ticker'] = ticker
sentiment['market'] = market
sentiment['timestamp'] = datetime.now().isoformat()
return sentiment
if __name__ == '__main__':
ni = NewsIntelligence()
result = ni.get_full_analysis('AAPL')
print(f"Sentiment Score: {result['score']}/100")
print(f"Trend: {result['trend']}")
print(f"Dominant Event: {result['dominant_event']}")
print(f"Article Count: {result['volume']}")
for art in result['articles'][:3]:
print(f"\n πŸ“° {art['headline']}")
print(f" Score: {art['sentiment']['adjusted_score']:.1f} | Event: {art['event']['type']}")