| """News Data Integration - Real-Time Sentiment Pipeline |
| |
| Connects to real news APIs and RSS feeds for live sentiment signals. |
| Replaces synthetic news with actual financial headlines. |
| |
| Supports: |
| - NewsAPI (https://newsapi.org/) - Free tier available |
| - RSS feeds (Yahoo Finance, Seeking Alpha, etc.) |
| - GDELT Project (global news database) |
| - Reddit/StockTwits social feeds |
| """ |
| import numpy as np |
| import pandas as pd |
| from datetime import datetime, timedelta |
| from typing import Dict, List, Optional, Tuple |
| import time |
| import warnings |
| warnings.filterwarnings('ignore') |
|
|
| try: |
| import feedparser |
| FEEDPARSER_AVAILABLE = True |
| except ImportError: |
| FEEDPARSER_AVAILABLE = False |
|
|
|
|
| class NewsAPIClient: |
| """ |
| NewsAPI.org client for financial news retrieval. |
| |
| Free tier: 100 requests/day |
| Paid tier: $449/month for 1M requests |
| |
| Use free tier for prototyping, upgrade for production. |
| """ |
| |
| def __init__(self, api_key: Optional[str] = None): |
| self.api_key = api_key |
| self.base_url = "https://newsapi.org/v2" |
| self.last_request_time = 0 |
| self.min_interval = 1.2 |
| |
| def _rate_limit(self): |
| """Enforce rate limiting""" |
| elapsed = time.time() - self.last_request_time |
| if elapsed < self.min_interval: |
| time.sleep(self.min_interval - elapsed) |
| self.last_request_time = time.time() |
| |
| def fetch_everything(self, |
| query: str, |
| from_date: Optional[str] = None, |
| to_date: Optional[str] = None, |
| language: str = 'en', |
| sort_by: str = 'publishedAt', |
| page_size: int = 100, |
| page: int = 1) -> List[Dict]: |
| """ |
| Fetch news articles matching query. |
| |
| Args: |
| query: Search query (e.g., "AAPL Apple stock earnings") |
| from_date: Start date (YYYY-MM-DD) |
| to_date: End date (YYYY-MM-DD) |
| language: 'en', 'es', 'fr', etc. |
| sort_by: 'relevancy', 'popularity', 'publishedAt' |
| page_size: Max 100 |
| page: Page number |
| """ |
| if self.api_key is None: |
| print("WARNING: No API key provided. Using mock data.") |
| return self._mock_news(query) |
| |
| try: |
| import requests |
| except ImportError: |
| print("WARNING: requests library not available. Using mock data.") |
| return self._mock_news(query) |
| |
| self._rate_limit() |
| |
| params = { |
| 'q': query, |
| 'apiKey': self.api_key, |
| 'language': language, |
| 'sortBy': sort_by, |
| 'pageSize': min(page_size, 100), |
| 'page': page |
| } |
| |
| if from_date: |
| params['from'] = from_date |
| if to_date: |
| params['to'] = to_date |
| |
| try: |
| response = requests.get( |
| f"{self.base_url}/everything", |
| params=params, |
| timeout=30 |
| ) |
| response.raise_for_status() |
| data = response.json() |
| |
| if data.get('status') != 'ok': |
| print(f"API Error: {data.get('message', 'Unknown error')}") |
| return self._mock_news(query) |
| |
| articles = data.get('articles', []) |
| |
| return [{ |
| 'title': a.get('title', ''), |
| 'description': a.get('description', ''), |
| 'content': a.get('content', ''), |
| 'published_at': a.get('publishedAt', ''), |
| 'source': a.get('source', {}).get('name', 'Unknown'), |
| 'url': a.get('url', ''), |
| 'author': a.get('author', '') |
| } for a in articles] |
| |
| except Exception as e: |
| print(f"Error fetching news: {e}") |
| return self._mock_news(query) |
| |
| def fetch_for_ticker(self, |
| ticker: str, |
| company_name: str, |
| from_date: Optional[str] = None, |
| to_date: Optional[str] = None, |
| page_size: int = 100) -> pd.DataFrame: |
| """ |
| Fetch news for a specific ticker and return formatted DataFrame. |
| """ |
| query = f"{ticker} {company_name} stock" |
| articles = self.fetch_everything( |
| query=query, |
| from_date=from_date, |
| to_date=to_date, |
| page_size=page_size |
| ) |
| |
| df = pd.DataFrame(articles) |
| df['ticker'] = ticker |
| df['query'] = query |
| |
| |
| df['text'] = df['title'].fillna('') + '. ' + df['description'].fillna('') + ' ' + df['content'].fillna('') |
| df['text'] = df['text'].str.strip() |
| |
| |
| df['date'] = pd.to_datetime(df['published_at'], errors='coerce') |
| |
| return df |
| |
| def fetch_multiple_tickers(self, |
| ticker_map: Dict[str, str], |
| from_date: Optional[str] = None, |
| to_date: Optional[str] = None) -> pd.DataFrame: |
| """ |
| Fetch news for multiple tickers. |
| |
| Args: |
| ticker_map: {ticker: company_name} |
| """ |
| all_news = [] |
| |
| for ticker, company in ticker_map.items(): |
| print(f"Fetching news for {ticker} ({company})...") |
| try: |
| df = self.fetch_for_ticker( |
| ticker, company, from_date, to_date |
| ) |
| all_news.append(df) |
| except Exception as e: |
| print(f" Error for {ticker}: {e}") |
| |
| if all_news: |
| return pd.concat(all_news, ignore_index=True) |
| return pd.DataFrame() |
| |
| def _mock_news(self, query: str) -> List[Dict]: |
| """Generate mock news for testing without API key""" |
| import random |
| |
| templates = [ |
| {"title": "{company} reports strong quarterly earnings, beating expectations", |
| "sentiment": "positive"}, |
| {"title": "{company} faces regulatory scrutiny over data practices", |
| "sentiment": "negative"}, |
| {"title": "Analysts upgrade {company} to buy rating", |
| "sentiment": "positive"}, |
| {"title": "{company} announces major product launch", |
| "sentiment": "positive"}, |
| {"title": "{company} stock falls amid market volatility", |
| "sentiment": "negative"}, |
| {"title": "Market awaits {company} earnings report next week", |
| "sentiment": "neutral"}, |
| ] |
| |
| company = query.split()[0] if query else "Company" |
| |
| articles = [] |
| for i, template in enumerate(random.sample(templates, min(3, len(templates)))): |
| articles.append({ |
| 'title': template['title'].format(company=company), |
| 'description': f"Analysis of {company} stock performance.", |
| 'content': f"Detailed article about {company} and market conditions.", |
| 'published_at': (datetime.now() - timedelta(hours=i*6)).isoformat(), |
| 'source': f'MockSource{i}', |
| 'url': f'https://example.com/article{i}', |
| 'author': 'MockAuthor' |
| }) |
| |
| return articles |
|
|
|
|
| class RSSFeedClient: |
| """ |
| RSS Feed client for real-time financial news. |
| |
| No API key needed! Just RSS feeds from financial websites. |
| """ |
| |
| FINANCIAL_FEEDS = { |
| 'yahoo_finance': 'https://finance.yahoo.com/news/rssindex', |
| 'marketwatch': 'https://www.marketwatch.com/rss/topstories', |
| 'seeking_alpha': 'https://seekingalpha.com/market_currents.xml', |
| 'investing_com': 'https://www.investing.com/rss/news.rss', |
| 'barrons': 'https://www.barrons.com/articles/rss', |
| 'wall_street_journal': 'https://feeds.a.dj.com/rss/WSJcomUSBusiness.xml', |
| 'reuters_business': 'https://www.reutersagency.com/feed/?taxonomy=markets&post_type=reuters-best', |
| 'benzinga': 'https://www.benzinga.com/feed', |
| 'the_street': 'https://www.thestreet.com/.rss/full/', |
| } |
| |
| def __init__(self): |
| self.feeds = {} |
| |
| def fetch_feed(self, feed_url: str, max_entries: int = 50) -> List[Dict]: |
| """Fetch and parse an RSS feed""" |
| if not FEEDPARSER_AVAILABLE: |
| print("WARNING: feedparser not available. Install with: pip install feedparser") |
| return [] |
| |
| try: |
| feed = feedparser.parse(feed_url) |
| |
| articles = [] |
| for entry in feed.entries[:max_entries]: |
| articles.append({ |
| 'title': entry.get('title', ''), |
| 'description': entry.get('summary', entry.get('description', '')), |
| 'content': entry.get('summary', ''), |
| 'published_at': entry.get('published', entry.get('updated', '')), |
| 'source': feed.feed.get('title', 'Unknown'), |
| 'url': entry.get('link', ''), |
| 'author': entry.get('author', '') |
| }) |
| |
| return articles |
| |
| except Exception as e: |
| print(f"Error fetching RSS feed {feed_url}: {e}") |
| return [] |
| |
| def fetch_all_feeds(self, max_entries_per_feed: int = 20) -> pd.DataFrame: |
| """Fetch all configured financial feeds""" |
| all_articles = [] |
| |
| for name, url in self.FINANCIAL_FEEDS.items(): |
| print(f"Fetching {name}...") |
| articles = self.fetch_feed(url, max_entries_per_feed) |
| for a in articles: |
| a['feed_source'] = name |
| all_articles.extend(articles) |
| |
| if not all_articles: |
| return pd.DataFrame() |
| |
| df = pd.DataFrame(all_articles) |
| df['text'] = df['title'].fillna('') + '. ' + df['description'].fillna('') |
| df['date'] = pd.to_datetime(df['published_at'], errors='coerce') |
| |
| return df |
| |
| def add_custom_feed(self, name: str, url: str): |
| """Add a custom RSS feed""" |
| self.FINANCIAL_FEEDS[name] = url |
|
|
|
|
| class GDELTClient: |
| """ |
| GDELT Project (Global Database of Events, Language, and Tone) client. |
| |
| Free, massive global news database. |
| https://www.gdeltproject.org/ |
| |
| GDELT provides: |
| - Every news article worldwide (updated every 15 minutes) |
| - Sentiment scoring (tone) |
| - Event coding |
| - Geographic tagging |
| """ |
| |
| GDELT_URL = "http://data.gdeltproject.org/gdeltv2/lastupdate.txt" |
| |
| def __init__(self): |
| pass |
| |
| def fetch_latest_updates(self) -> pd.DataFrame: |
| """Fetch latest GDELT update URLs""" |
| try: |
| import requests |
| response = requests.get(self.GDELT_URL, timeout=30) |
| response.raise_for_status() |
| |
| lines = response.text.strip().split('\n') |
| |
| updates = [] |
| for line in lines: |
| parts = line.split() |
| if len(parts) >= 3: |
| updates.append({ |
| 'timestamp': parts[0], |
| 'url': parts[2], |
| 'type': 'events' if 'export' in parts[2] else 'mentions' if 'mentions' in parts[2] else 'gkg' |
| }) |
| |
| return pd.DataFrame(updates) |
| |
| except Exception as e: |
| print(f"Error fetching GDELT updates: {e}") |
| return pd.DataFrame() |
| |
| def fetch_gdelt_csv(self, url: str) -> pd.DataFrame: |
| """Fetch and parse a GDELT CSV file""" |
| try: |
| import requests |
| import zipfile |
| import io |
| |
| response = requests.get(url, timeout=60) |
| response.raise_for_status() |
| |
| |
| with zipfile.ZipFile(io.BytesIO(response.content)) as z: |
| csv_name = z.namelist()[0] |
| with z.open(csv_name) as f: |
| df = pd.read_csv(f, sep='\t', header=None, low_memory=False) |
| |
| |
| if 'export' in url: |
| |
| columns = ['GlobalEventID', 'Day', 'MonthYear', 'Year', 'FractionDate', |
| 'Actor1Code', 'Actor1Name', 'Actor1CountryCode', 'Actor1KnownGroupCode', |
| 'Actor1EthnicCode', 'Actor1Religion1Code', 'Actor1Religion2Code', |
| 'Actor1Type1Code', 'Actor1Type2Code', 'Actor1Type3Code', |
| 'Actor2Code', 'Actor2Name', 'Actor2CountryCode', 'Actor2KnownGroupCode', |
| 'Actor2EthnicCode', 'Actor2Religion1Code', 'Actor2Religion2Code', |
| 'Actor2Type1Code', 'Actor2Type2Code', 'Actor2Type3Code', |
| 'IsRootEvent', 'EventCode', 'EventBaseCode', 'EventRootCode', |
| 'QuadClass', 'GoldsteinScale', 'NumMentions', 'NumSources', |
| 'NumArticles', 'AvgTone', 'Actor1Geo_Type', 'Actor1Geo_FullName', |
| 'Actor1Geo_CountryCode', 'Actor1Geo_ADM1Code', 'Actor1Geo_Lat', |
| 'Actor1Geo_Long', 'Actor1Geo_FeatureID', 'Actor2Geo_Type', |
| 'Actor2Geo_FullName', 'Actor2Geo_CountryCode', 'Actor2Geo_ADM1Code', |
| 'Actor2Geo_Lat', 'Actor2Geo_Long', 'Actor2Geo_FeatureID', |
| 'ActionGeo_Type', 'ActionGeo_FullName', 'ActionGeo_CountryCode', |
| 'ActionGeo_ADM1Code', 'ActionGeo_Lat', 'ActionGeo_Long', |
| 'ActionGeo_FeatureID', 'DATEADDED', 'SOURCEURL'] |
| df.columns = columns[:len(df.columns)] |
| |
| return df |
| |
| except Exception as e: |
| print(f"Error fetching GDELT data: {e}") |
| return pd.DataFrame() |
|
|
|
|
| class SocialMediaScraper: |
| """ |
| Social media sentiment scraper (Reddit, StockTwits, Twitter/X). |
| |
| Note: Twitter API now requires paid access ($100/month basic tier). |
| Reddit API has rate limits but free tier available. |
| StockTwits has free API for basic usage. |
| """ |
| |
| REDDIT_SUBREDDITS = [ |
| 'wallstreetbets', 'stocks', 'investing', 'StockMarket', |
| 'options', 'pennystocks', 'SecurityAnalysis', 'algotrading' |
| ] |
| |
| def __init__(self): |
| pass |
| |
| def fetch_reddit_posts(self, |
| subreddit: str, |
| limit: int = 100, |
| time_filter: str = 'day') -> pd.DataFrame: |
| """ |
| Fetch Reddit posts from a subreddit. |
| |
| Requires: pip install praw |
| You need Reddit API credentials (free at reddit.com/prefs/apps) |
| """ |
| try: |
| import praw |
| except ImportError: |
| print("WARNING: praw not available. Install with: pip install praw") |
| return pd.DataFrame() |
| |
| |
| |
| print("REDDIT INTEGRATION PATTERN:") |
| print(" 1. Create app at https://www.reddit.com/prefs/apps") |
| print(" 2. Get client_id and client_secret") |
| print(" 3. Initialize: praw.Reddit(client_id='...', client_secret='...', user_agent='...')") |
| print(" 4. Fetch: reddit.subreddit('wallstreetbets').hot(limit=100)") |
| |
| return pd.DataFrame() |
| |
| def fetch_stocktwits_feed(self, |
| ticker: str, |
| limit: int = 30) -> pd.DataFrame: |
| """ |
| Fetch StockTwits messages for a ticker. |
| |
| StockTwits API: https://api.stocktwits.com/developers/docs |
| Free tier available for basic usage. |
| """ |
| try: |
| import requests |
| except ImportError: |
| print("WARNING: requests not available") |
| return pd.DataFrame() |
| |
| url = f"https://api.stocktwits.com/api/2/streams/symbol/{ticker}.json" |
| |
| try: |
| response = requests.get(url, timeout=30) |
| response.raise_for_status() |
| data = response.json() |
| |
| messages = data.get('messages', []) |
| |
| return pd.DataFrame([{ |
| 'text': m.get('body', ''), |
| 'created_at': m.get('created_at', ''), |
| 'username': m.get('user', {}).get('username', ''), |
| 'sentiment': m.get('entities', {}).get('sentiment', {}).get('basic', 'neutral'), |
| 'likes': m.get('likes', {}).get('total', 0), |
| 'ticker': ticker |
| } for m in messages]) |
| |
| except Exception as e: |
| print(f"Error fetching StockTwits: {e}") |
| return pd.DataFrame() |
|
|
|
|
| class NewsPipeline: |
| """ |
| Complete news pipeline: fetch -> preprocess -> sentiment -> aggregate. |
| |
| Connects NewsAPI + RSS feeds + Social media into one unified feed. |
| """ |
| |
| def __init__(self, |
| news_api_key: Optional[str] = None, |
| use_rss: bool = True, |
| use_gdelt: bool = False, |
| use_social: bool = False): |
| self.news_api = NewsAPIClient(news_api_key) |
| self.rss_client = RSSFeedClient() |
| self.gdelt_client = GDELTClient() |
| self.social_scraper = SocialMediaScraper() |
| |
| self.use_rss = use_rss |
| self.use_gdelt = use_gdelt |
| self.use_social = use_social |
| |
| def fetch_all(self, |
| tickers: List[str], |
| company_names: Optional[Dict[str, str]] = None, |
| from_date: Optional[str] = None, |
| to_date: Optional[str] = None) -> pd.DataFrame: |
| """ |
| Fetch news from ALL sources for given tickers. |
| |
| Returns unified DataFrame with all articles. |
| """ |
| all_news = [] |
| |
| |
| if company_names: |
| ticker_map = {t: company_names.get(t, t) for t in tickers} |
| else: |
| ticker_map = {t: t for t in tickers} |
| |
| print("[NewsAPI] Fetching financial news...") |
| try: |
| news_api_df = self.news_api.fetch_multiple_tickers( |
| ticker_map, from_date, to_date |
| ) |
| if not news_api_df.empty: |
| news_api_df['source_type'] = 'newsapi' |
| all_news.append(news_api_df) |
| except Exception as e: |
| print(f" NewsAPI error: {e}") |
| |
| |
| if self.use_rss: |
| print("[RSS] Fetching financial feeds...") |
| try: |
| rss_df = self.rss_client.fetch_all_feeds(max_entries_per_feed=10) |
| if not rss_df.empty: |
| rss_df['source_type'] = 'rss' |
| |
| rss_df['ticker'] = rss_df['text'].apply( |
| lambda x: self._extract_tickers(str(x), tickers) |
| ) |
| rss_df = rss_df[rss_df['ticker'].notna()] |
| if not rss_df.empty: |
| all_news.append(rss_df) |
| except Exception as e: |
| print(f" RSS error: {e}") |
| |
| |
| if all_news: |
| combined = pd.concat(all_news, ignore_index=True) |
| combined['text'] = combined['text'].fillna('') |
| combined['date'] = pd.to_datetime(combined['date'], errors='coerce') |
| return combined.sort_values('date', ascending=False) |
| |
| return pd.DataFrame() |
| |
| def _extract_tickers(self, text: str, tickers: List[str]) -> Optional[str]: |
| """Simple keyword matching to tag articles with tickers""" |
| text_upper = text.upper() |
| for ticker in tickers: |
| if f' {ticker} ' in text_upper or f'${ticker}' in text_upper: |
| return ticker |
| return None |
| |
| def aggregate_daily_sentiment(self, |
| news_df: pd.DataFrame, |
| sentiment_fn: Optional[Callable] = None) -> pd.DataFrame: |
| """ |
| Aggregate news into daily sentiment scores per ticker. |
| |
| Requires sentiment_fn that takes text -> dict with 'sentiment_score'. |
| If not provided, returns raw counts only. |
| """ |
| if news_df.empty: |
| return pd.DataFrame() |
| |
| |
| news_df['date'] = pd.to_datetime(news_df['date'], errors='coerce') |
| news_df['date'] = news_df['date'].dt.date |
| |
| if sentiment_fn is not None: |
| print("Computing sentiment scores...") |
| sentiments = [] |
| for text in news_df['text']: |
| try: |
| result = sentiment_fn(str(text)) |
| sentiments.append(result.get('sentiment_score', 0)) |
| except: |
| sentiments.append(0) |
| news_df['sentiment_score'] = sentiments |
| else: |
| news_df['sentiment_score'] = 0 |
| |
| |
| daily = news_df.groupby(['date', 'ticker']).agg({ |
| 'sentiment_score': ['mean', 'std', 'count'], |
| 'text': 'first' |
| }).reset_index() |
| |
| |
| daily.columns = ['date', 'ticker', 'sentiment_mean', 'sentiment_std', |
| 'article_count', 'sample_text'] |
| |
| |
| daily['confidence'] = np.minimum(daily['article_count'] / 5, 1.0) |
| daily['sentiment_alpha'] = daily['sentiment_mean'] * daily['confidence'] |
| |
| return daily |
|
|
|
|
| if __name__ == '__main__': |
| |
| pipeline = NewsPipeline() |
| |
| |
| news = pipeline.news_api.fetch_for_ticker('AAPL', 'Apple', page_size=5) |
| print(f"Fetched {len(news)} articles for AAPL") |
| print(news[['title', 'source', 'date']].head()) |
|
|