File size: 12,583 Bytes
c5a2a63
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
"""News Intelligence v1.0 β€” Real-Time News Sentiment + Event Detection
FinBERT-based sentiment scoring with event classification.
Falls back to regex-based analysis if FinBERT unavailable.
"""
import re, os, json, requests
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Tuple
import numpy as np

# ── Event detection keywords ─────────────────────────────────
EVENT_PATTERNS = {
    'earnings':      ['earnings', 'quarterly', 'revenue', 'eps', 'profit', 'q[1-4]', 'fiscal'],
    'fed':           ['federal reserve', 'fed', 'fomc', 'interest rate', 'rate hike', 'rate cut', 'powell'],
    'cpi':           ['cpi', 'inflation', 'consumer price', 'core pce'],
    'jobs':          ['jobs report', 'unemployment', 'nfp', 'nonfarm payroll', 'labor'],
    'lawsuit':       ['lawsuit', 'sec', 'doj', 'investigation', 'antitrust', 'fine', 'settlement'],
    'merger':        ['merger', 'acquisition', 'acquire', 'buyout', 'merging', 'takeover'],
    'dividend':      ['dividend', 'buyback', 'share repurchase', 'dividend yield'],
    'split':         ['stock split', 'split', 'reverse split'],
    'upgrade':       ['upgrade', 'upgraded', 'overweight', 'buy rating', 'price target raised'],
    'downgrade':     ['downgrade', 'downgraded', 'underweight', 'sell rating', 'price target cut'],
    'product':       ['product launch', 'new product', 'iphone', 'ai model', 'release date'],
    'supply_chain':  ['supply chain', 'shortage', 'inventory', 'chip shortage', 'factory'],
    'macro':         ['gdp', 'recession', 'economic growth', 'fiscal policy', 'stimulus'],
    'geopolitical':  ['war', 'sanctions', 'tension', 'china', 'trade war', 'tariff'],
    'analyst':       ['analyst', 'wall street', 'target price', 'consensus'],
}

BULLISH_WORDS = [
    'beat', 'strong', 'growth', 'surge', 'rally', 'bullish', 'outperform',
    'exceed', 'record', 'milestone', 'breakthrough', 'partnership', 'launch',
    'innovation', 'momentum', 'premium', 'dominant', 'leader', 'expansion'
]

BEARISH_WORDS = [
    'miss', 'weak', 'decline', 'drop', 'crash', 'bearish', 'underperform',
    'loss', 'concern', 'warning', 'risk', 'lawsuit', 'investigation',
    'fraud', 'default', 'bankruptcy', 'layoff', 'cut', 'slash', 'downturn',
    'recession', 'contagion', 'crisis', 'collapse'
]


class NewsIntelligence:
    """Multi-source news sentiment with FinBERT + rule-based fallback."""

    def __init__(self, finbert_available: bool = None, cache_dir: str = ".cache/news"):
        self.cache_dir = cache_dir
        os.makedirs(cache_dir, exist_ok=True)
        self._finbert = None
        self._tokenizer = None
        self._sentiment_cache = {}  # ticker -> {date: score}

        if finbert_available is None:
            try:
                from transformers import AutoTokenizer, AutoModelForSequenceClassification
                self._tokenizer = AutoTokenizer.from_pretrained("ProsusAI/finbert")
                self._finbert = AutoModelForSequenceClassification.from_pretrained("ProsusAI/finbert")
                self._finbert.eval()
                finbert_available = True
            except Exception:
                finbert_available = False
        self.use_finbert = finbert_available

    def classify_event(self, headline: str, summary: str = "") -> Tuple[str, float]:
        """Classify article into event type and severity (0-1)."""
        text = (headline + " " + summary).lower()
        scores = {}
        for event_type, patterns in EVENT_PATTERNS.items():
            score = 0
            for pat in patterns:
                count = len(re.findall(pat, text))
                score += count
            if score > 0:
                scores[event_type] = score

        if not scores:
            return 'general', 0.1

        best = max(scores, key=scores.get)
        return best, min(1.0, scores[best] * 0.5)

    def rule_sentiment(self, headline: str, summary: str = "") -> Dict:
        """Rule-based sentiment as fallback when FinBERT unavailable."""
        text = (headline + " " + summary).lower()
        bull = sum(text.count(w) for w in BULLISH_WORDS)
        bear = sum(text.count(w) for w in BEARISH_WORDS)
        total = bull + bear + 1e-10
        # Map to 0-100 scale
        sentiment = 50 + (bull - bear) / total * 50
        confidence = min(1.0, total * 0.1)
        return {
            'score': max(0, min(100, sentiment)),
            'confidence': confidence,
            'method': 'rule'
        }

    def finbert_sentiment(self, headline: str, summary: str = "") -> Dict:
        """FinBERT inference. Returns score 0-100."""
        if not self.use_finbert:
            return self.rule_sentiment(headline, summary)

        import torch
        text = headline
        if summary:
            text += ". " + summary[:500]

        inputs = self._tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
        with torch.no_grad():
            outputs = self._finbert(**inputs)
            probs = torch.softmax(outputs.logits, dim=1)[0].numpy()

        # FinBERT: [negative, neutral, positive]
        neg, neu, pos = probs
        # Map to 0-100
        score = 50 + (pos - neg) * 50
        confidence = 1 - neu  # Higher confidence when less neutral

        return {
            'score': max(0, min(100, score)),
            'confidence': float(confidence),
            'probs': {'negative': float(neg), 'neutral': float(neu), 'positive': float(pos)},
            'method': 'finbert'
        }

    def analyze_article(self, headline: str, summary: str = "",
                        timestamp: str = None) -> Dict:
        """Full article analysis: sentiment + event classification."""
        event_type, event_severity = self.classify_event(headline, summary)
        sentiment = self.finbert_sentiment(headline, summary)

        # Adjust sentiment for event context
        event_sentiment_override = {
            'earnings': 0,
            'fed': -10,
            'lawsuit': -25,
            'upgrade': +20,
            'downgrade': -20,
            'merger': +15,
            'dividend': +10,
            'product': +15,
        }
        adj_score = sentiment['score']
        if event_type in event_sentiment_override:
            adj_score += event_sentiment_override[event_type]
            sentiment['adjusted_score'] = max(0, min(100, adj_score))
        else:
            sentiment['adjusted_score'] = adj_score

        return {
            'headline': headline,
            'summary': summary[:200] if summary else "",
            'timestamp': timestamp or datetime.now().isoformat(),
            'sentiment': sentiment,
            'event': {
                'type': event_type,
                'severity': event_severity,
            }
        }

    def fetch_newsapi(self, query: str, api_key: str = None, days: int = 7) -> List[Dict]:
        """Fetch news from NewsAPI. Returns list of article analyses."""
        if not api_key:
            api_key = os.environ.get('NEWSAPI_KEY')
        if not api_key:
            return self._mock_news(query)

        from_date = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
        url = f"https://newsapi.org/v2/everything?q={query}&from={from_date}&sortBy=publishedAt&language=en&apiKey={api_key}"

        try:
            r = requests.get(url, timeout=15)
            r.raise_for_status()
            articles = r.json().get('articles', [])
            results = []
            for art in articles[:10]:
                analysis = self.analyze_article(
                    art.get('title', ''),
                    art.get('description', ''),
                    art.get('publishedAt')
                )
                results.append(analysis)
            return results
        except Exception as e:
            print(f"NewsAPI error: {e}")
            return self._mock_news(query)

    def fetch_yfinance_news(self, ticker: str) -> List[Dict]:
        """Fetch news from yfinance."""
        try:
            import yfinance as yf
            t = yf.Ticker(ticker)
            news = t.news or []
            results = []
            for item in news[:10]:
                title = item.get('title', '') or item.get('content', {}).get('title', '')
                summary = item.get('summary', '') or item.get('content', {}).get('summary', '')
                analysis = self.analyze_article(title, summary)
                results.append(analysis)
            return results
        except Exception as e:
            print(f"yfinance news error: {e}")
            return self._mock_news(ticker)

    def aggregate_sentiment(self, articles: List[Dict]) -> Dict:
        """Aggregate sentiment across articles with recency weighting."""
        if not articles:
            return {'score': 50, 'confidence': 0, 'volume': 0, 'trend': 'neutral'}

        scores = []
        for art in articles:
            adj = art['sentiment'].get('adjusted_score', art['sentiment']['score'])
            conf = art['sentiment'].get('confidence', 0.5)
            scores.append((adj, conf))

        if not scores:
            return {'score': 50, 'confidence': 0, 'volume': 0, 'trend': 'neutral'}

        # Weighted average by confidence
        total_weight = sum(conf for _, conf in scores) + 1e-10
        weighted_score = sum(s * c for s, c in scores) / total_weight

        # Count by sentiment
        bullish = sum(1 for s, _ in scores if s > 55)
        bearish = sum(1 for s, _ in scores if s < 45)
        neutral = sum(1 for s, _ in scores if 45 <= s <= 55)

        volume = len(scores)
        if bullish > bearish * 2:
            trend = 'strong_bullish'
        elif bullish > bearish:
            trend = 'bullish'
        elif bearish > bullish * 2:
            trend = 'strong_bearish'
        elif bearish > bullish:
            trend = 'bearish'
        else:
            trend = 'mixed'

        # Dominant event
        events = [a['event']['type'] for a in articles]
        event_counts = {}
        for e in events:
            event_counts[e] = event_counts.get(e, 0) + 1
        dominant_event = max(event_counts, key=event_counts.get) if event_counts else 'general'

        return {
            'score': round(weighted_score, 1),
            'confidence': round(total_weight / volume, 2),
            'volume': volume,
            'trend': trend,
            'bullish_count': bullish,
            'bearish_count': bearish,
            'neutral_count': neutral,
            'dominant_event': dominant_event,
            'event_counts': event_counts,
        }

    def _mock_news(self, query: str) -> List[Dict]:
        """Mock news for testing without API keys."""
        mock = [
            f"{query} beats earnings expectations, revenue surges 15%",
            f"{query} announces new AI product partnership",
            f"Analysts upgrade {query} to overweight, target raised to $500",
            f"{query} faces supply chain headwinds in Q3",
            f"{query} maintains guidance despite macro uncertainty",
        ]
        return [self.analyze_article(h) for h in mock]

    def get_full_analysis(self, ticker: str, market: str = 'US', period_days: int = 7) -> Dict:
        """Full news intelligence pipeline for a ticker."""
        # Try yfinance first
        articles = self.fetch_yfinance_news(ticker)

        # If insufficient, try NewsAPI
        if len(articles) < 3:
            api_articles = self.fetch_newsapi(ticker, days=period_days)
            articles.extend(api_articles)

        # Deduplicate by headline
        seen = set()
        unique = []
        for a in articles:
            key = a['headline'][:50].lower()
            if key not in seen:
                seen.add(key)
                unique.append(a)

        sentiment = self.aggregate_sentiment(unique)
        sentiment['articles'] = unique[:5]  # Top 5
        sentiment['ticker'] = ticker
        sentiment['market'] = market
        sentiment['timestamp'] = datetime.now().isoformat()
        return sentiment


if __name__ == '__main__':
    ni = NewsIntelligence()
    result = ni.get_full_analysis('AAPL')
    print(f"Sentiment Score: {result['score']}/100")
    print(f"Trend: {result['trend']}")
    print(f"Dominant Event: {result['dominant_event']}")
    print(f"Article Count: {result['volume']}")
    for art in result['articles'][:3]:
        print(f"\n  πŸ“° {art['headline']}")
        print(f"     Score: {art['sentiment']['adjusted_score']:.1f} | Event: {art['event']['type']}")