| """ |
| Professional Finance News Scraper - Direct from Source Websites |
| Scrapes: Reuters, Bloomberg, FT, WSJ, CNBC, MarketWatch, etc. |
| No Twitter API needed - direct RSS and web scraping |
| """ |
|
|
| from datetime import datetime, timedelta |
| from typing import List, Dict, Optional |
|
|
| import logging |
| import re |
| from concurrent.futures import ThreadPoolExecutor |
|
|
| import requests |
| import pandas as pd |
| import feedparser |
| import streamlit as st |
| from bs4 import BeautifulSoup |
|
|
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
|
|
| class FinanceNewsScraper: |
| """ |
| Professional-grade financial news scraper using RSS feeds and web scraping |
| No authentication required - publicly available sources |
| """ |
|
|
| |
| |
| SOURCES = { |
| |
| 'cnbc': { |
| 'name': 'CNBC', |
| 'rss': 'https://www.cnbc.com/id/100003114/device/rss/rss.html', |
| 'web': 'https://www.cnbc.com/world/', |
| 'selectors': {'headline': 'a.Card-title', 'link': 'a.Card-title'}, |
| 'weight': 1.2, |
| 'web_priority': True, |
| 'specialization': ['markets'] |
| }, |
| 'wsj_markets': { |
| 'name': 'WSJ Markets', |
| 'rss': 'https://feeds.a.dj.com/rss/RSSMarketsMain.xml', |
| 'web': None, |
| 'weight': 1.4, |
| 'specialization': ['markets'] |
| }, |
| 'bloomberg_markets': { |
| 'name': 'Bloomberg', |
| 'rss': 'https://feeds.bloomberg.com/markets/news.rss', |
| 'web': None, |
| 'weight': 1.5, |
| 'specialization': ['markets'] |
| }, |
| 'ft_markets': { |
| 'name': 'Financial Times', |
| 'rss': 'https://www.ft.com/markets?format=rss', |
| 'web': 'https://www.ft.com/markets', |
| 'selectors': {'headline': 'div.o-teaser__heading', 'link': 'a.js-teaser-heading-link'}, |
| 'weight': 1.4, |
| 'web_priority': True, |
| 'specialization': ['markets'] |
| }, |
| 'economist': { |
| 'name': 'The Economist', |
| 'rss': 'https://www.economist.com/finance-and-economics/rss.xml', |
| 'web': None, |
| 'weight': 1.3, |
| 'specialization': ['macro', 'geopolitical'] |
| }, |
|
|
| |
| 'bbc_business': { |
| 'name': 'BBC Business', |
| 'rss': 'http://feeds.bbci.co.uk/news/business/rss.xml', |
| 'web': 'https://www.bbc.com/news/business', |
| 'selectors': {'headline': 'h2[data-testid="card-headline"]', 'link': 'a[data-testid="internal-link"]'}, |
| 'weight': 1.4, |
| 'web_priority': True, |
| 'specialization': ['geopolitical', 'macro'] |
| }, |
| 'yahoo_finance': { |
| 'name': 'Yahoo Finance', |
| 'rss': 'https://finance.yahoo.com/news/rssindex', |
| 'web': 'https://finance.yahoo.com/', |
| 'selectors': {'headline': 'h3.clamp', 'link': 'a'}, |
| 'weight': 1.3, |
| 'web_priority': True, |
| 'specialization': ['markets', 'macro'] |
| }, |
| 'google_news_finance': { |
| 'name': 'Google News Finance', |
| 'rss': 'https://news.google.com/rss/search?q=finance+OR+stocks+OR+markets+OR+economy&hl=en-US&gl=US&ceid=US:en', |
| 'web': None, |
| 'weight': 1.2, |
| 'specialization': ['markets', 'macro', 'geopolitical'] |
| }, |
| 'google_news_business': { |
| 'name': 'Google News Business', |
| 'rss': 'https://news.google.com/rss/topics/CAAqJggKIiBDQkFTRWdvSUwyMHZNRGx6TVdZU0FtVnVHZ0pWVXlnQVAB', |
| 'web': None, |
| 'weight': 1.2, |
| 'specialization': ['markets', 'macro'] |
| }, |
|
|
| |
| 'federal_reserve': { |
| 'name': 'Federal Reserve', |
| 'rss': 'https://www.federalreserve.gov/feeds/press_all.xml', |
| 'web': None, |
| 'weight': 2.0, |
| 'specialization': ['macro'] |
| }, |
| 'ecb': { |
| 'name': 'European Central Bank', |
| 'rss': 'https://www.ecb.europa.eu/rss/press.xml', |
| 'web': None, |
| 'weight': 2.0, |
| 'specialization': ['macro'] |
| }, |
| 'imf': { |
| 'name': 'IMF', |
| 'rss': 'https://www.imf.org/en/news/rss', |
| 'web': None, |
| 'weight': 1.7, |
| 'specialization': ['macro', 'geopolitical'] |
| } |
| } |
|
|
| |
| MACRO_KEYWORDS = [ |
| 'Fed', 'ECB', 'BoE', 'BoJ', 'FOMC', 'Powell', 'Lagarde', |
| 'interest rate', 'rate cut', 'rate hike', 'inflation', 'CPI', |
| 'GDP', 'unemployment', 'jobs report', 'NFP', 'monetary policy' |
| ] |
|
|
| MARKET_KEYWORDS = [ |
| 'S&P', 'Dow', 'Nasdaq', 'earnings', 'EPS', 'stock', 'equity', |
| 'rally', 'selloff', 'correction', 'merger', 'acquisition', 'IPO' |
| ] |
|
|
| GEOPOLITICAL_KEYWORDS = [ |
| 'war', 'conflict', 'sanctions', 'trade', 'tariff', 'crisis', |
| 'Ukraine', 'Russia', 'China', 'Taiwan', 'Middle East' |
| ] |
|
|
| def __init__(self): |
| """Initialize scraper""" |
| self.session = requests.Session() |
| |
| self.session.headers.update({ |
| 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', |
| 'Accept-Language': 'en-US,en;q=0.9', |
| 'Accept-Encoding': 'gzip, deflate, br', |
| 'DNT': '1', |
| 'Connection': 'keep-alive', |
| 'Upgrade-Insecure-Requests': '1' |
| }) |
|
|
| def _fetch_rss_feed(self, source_name: str, source_info: Dict) -> List[Dict]: |
| """Fetch and parse RSS feed from a single source""" |
| try: |
| feed = feedparser.parse(source_info['rss']) |
|
|
| if not feed.entries: |
| logger.warning(f"No entries found for {source_name}") |
| return [] |
|
|
| news_items = [] |
| for entry in feed.entries[:10]: |
| |
| try: |
| if hasattr(entry, 'published_parsed') and entry.published_parsed: |
| timestamp = datetime(*entry.published_parsed[:6]) |
| elif hasattr(entry, 'updated_parsed') and entry.updated_parsed: |
| timestamp = datetime(*entry.updated_parsed[:6]) |
| else: |
| timestamp = datetime.now() |
| except: |
| timestamp = datetime.now() |
|
|
| |
| if (datetime.now() - timestamp).days > 1: |
| continue |
|
|
| |
| title = entry.get('title', '') |
| summary = entry.get('summary', '') or entry.get('description', '') |
|
|
| |
| if summary: |
| summary = BeautifulSoup(summary, 'html.parser').get_text() |
| summary = self._extract_summary(summary) |
|
|
| |
| url = entry.get('link', '') |
|
|
| |
| text = f"{title} {summary}" |
| category = self._categorize_text(text, source_info['specialization']) |
| sentiment = self._analyze_sentiment(text) |
| impact = self._assess_impact(source_info['weight'], title) |
| is_breaking = self._detect_breaking_news(title) |
|
|
| news_items.append({ |
| 'id': hash(url), |
| 'title': title, |
| 'summary': summary or self._extract_summary(title), |
| 'source': source_info['name'], |
| 'category': category, |
| 'timestamp': timestamp, |
| 'sentiment': sentiment, |
| 'impact': impact, |
| 'url': url, |
| 'likes': 0, |
| 'retweets': 0, |
| 'is_breaking': is_breaking, |
| 'source_weight': source_info['weight'], |
| 'from_web': False |
| }) |
|
|
| return news_items |
|
|
| except Exception as e: |
| logger.error(f"Error fetching RSS for {source_name}: {e}") |
| return [] |
|
|
| def _scrape_web_page(self, source_name: str, source_info: Dict) -> List[Dict]: |
| """Scrape news headlines directly from website main page""" |
| try: |
| |
| response = self.session.get(source_info['web'], timeout=10) |
| response.raise_for_status() |
|
|
| soup = BeautifulSoup(response.content, 'lxml') |
|
|
| |
| headline_selector = source_info['selectors']['headline'] |
| link_selector = source_info['selectors']['link'] |
|
|
| news_items = [] |
|
|
| |
| headlines = soup.select(headline_selector) |
|
|
| for headline_elem in headlines[:10]: |
| try: |
| |
| title = headline_elem.get_text(separator=' ', strip=True) |
| |
| title = re.sub(r'\s+', ' ', title) |
| |
| title = re.sub(r'<[^>]+>', '', title) |
| |
| from html import unescape |
| title = unescape(title) |
|
|
| if not title or len(title) < 10: |
| continue |
|
|
| |
| if any(marker in title for marker in ['<!--', '-->', 'style=', '<div', '</div>', '<span', '</span>', 'justify-content', 'flex:', 'padding:']): |
| logger.warning(f"Skipping malformed title from {source_name} (contains HTML): {title[:100]}...") |
| continue |
|
|
| |
| if len(title) > 500: |
| logger.warning(f"Skipping suspiciously long title from {source_name}: {len(title)} chars") |
| continue |
|
|
| |
| |
| link_elem = headline_elem if headline_elem.name == 'a' else headline_elem.find('a') |
| if not link_elem: |
| |
| link_elem = headline_elem.find_parent('a') |
| if not link_elem: |
| |
| parent = headline_elem.find_parent() |
| if parent: |
| link_elem = parent.find('a') |
|
|
| if not link_elem: |
| continue |
|
|
| |
| url = link_elem.get('href', '') |
| if not url: |
| continue |
|
|
| if url.startswith('/'): |
| |
| from urllib.parse import urljoin |
| url = urljoin(source_info['web'], url) |
|
|
| |
| if not url.startswith('http'): |
| continue |
|
|
| |
| title = title.replace('\n', ' ').replace('\r', ' ').strip() |
|
|
| |
| category = self._categorize_text(title, source_info['specialization']) |
| sentiment = self._analyze_sentiment(title) |
| impact = self._assess_impact(source_info['weight'], title) |
| is_breaking = self._detect_breaking_news(title) |
|
|
| |
| summary = self._extract_summary(title) if len(title) > 150 else title |
|
|
| news_items.append({ |
| 'id': hash(url), |
| 'title': title, |
| 'summary': summary, |
| 'source': source_info['name'], |
| 'category': category, |
| 'timestamp': datetime.now(), |
| 'sentiment': sentiment, |
| 'impact': impact, |
| 'url': url, |
| 'likes': 0, |
| 'retweets': 0, |
| 'is_breaking': is_breaking, |
| 'source_weight': source_info['weight'], |
| 'from_web': True |
| }) |
|
|
| except Exception as e: |
| logger.debug(f"Error parsing headline from {source_name}: {e}") |
| continue |
|
|
| logger.info(f"Scraped {len(news_items)} items from {source_name} web page") |
| return news_items |
|
|
| except Exception as e: |
| logger.error(f"Error scraping web page for {source_name}: {e}") |
| return [] |
|
|
| def scrape_news(self, max_items: int = 100) -> List[Dict]: |
| """ |
| Scrape news from all sources with caching |
| Uses ThreadPoolExecutor for parallel fetching from both RSS and web pages |
| """ |
| all_news = [] |
| seen_urls = set() |
|
|
| |
| with ThreadPoolExecutor(max_workers=8) as executor: |
| futures = [] |
|
|
| |
| for name, info in self.SOURCES.items(): |
| |
| futures.append((executor.submit(self._fetch_rss_feed, name, info), name, 'RSS')) |
| |
| if info.get('web'): |
| futures.append((executor.submit(self._scrape_web_page, name, info), name, 'Web')) |
|
|
| for future, source_name, method in futures: |
| try: |
| news_items = future.result() |
|
|
| |
| unique_items = [] |
| for item in news_items: |
| if item['url'] not in seen_urls: |
| seen_urls.add(item['url']) |
| unique_items.append(item) |
|
|
| all_news.extend(unique_items) |
| if len(unique_items) > 0: |
| logger.info(f"Fetched {len(unique_items)} unique items from {source_name} ({method})") |
| except Exception as e: |
| logger.error(f"Error processing {source_name} ({method}): {e}") |
|
|
| |
| if not all_news: |
| logger.warning("No news fetched from any source - using mock data") |
| return self._get_mock_news() |
|
|
| |
| all_news.sort( |
| key=lambda x: (x.get('from_web', False), x['is_breaking'], x['impact'] == 'high', x['timestamp']), |
| reverse=True |
| ) |
|
|
| logger.info(f"Total unique news items: {len(all_news)} (Web: {sum(1 for n in all_news if n.get('from_web'))}, RSS: {sum(1 for n in all_news if not n.get('from_web'))})") |
| return all_news[:max_items] |
|
|
| def get_main_page_news(self) -> pd.DataFrame: |
| """Get only news from main pages (web-scraped)""" |
| if not self.news_cache: |
| self.news_cache = self.scrape_news(max_items=100) |
| self.last_fetch = datetime.now() |
|
|
| main_news = [n for n in self.news_cache if n.get('from_web', False)] |
| df = pd.DataFrame(main_news) |
| if not df.empty: |
| df['timestamp'] = pd.to_datetime(df['timestamp']) |
| return df |
|
|
| def _categorize_text(self, text: str, source_specialization: List[str]) -> str: |
| """Categorize news based on keywords and source specialization""" |
| text_lower = text.lower() |
|
|
| |
| macro_score = sum(1 for kw in self.MACRO_KEYWORDS if kw.lower() in text_lower) |
| market_score = sum(1 for kw in self.MARKET_KEYWORDS if kw.lower() in text_lower) |
| geo_score = sum(1 for kw in self.GEOPOLITICAL_KEYWORDS if kw.lower() in text_lower) |
|
|
| |
| if 'macro' in source_specialization: |
| macro_score *= 1.5 |
| if 'markets' in source_specialization: |
| market_score *= 1.5 |
| if 'geopolitical' in source_specialization: |
| geo_score *= 1.5 |
|
|
| scores = {'macro': macro_score, 'markets': market_score, 'geopolitical': geo_score} |
| return max(scores, key=scores.get) if max(scores.values()) > 0 else 'markets' |
|
|
| def _analyze_sentiment(self, text: str) -> str: |
| """Analyze sentiment based on keywords""" |
| text_lower = text.lower() |
|
|
| positive = ['surge', 'soar', 'rally', 'beat', 'upgrade', 'bullish', |
| 'gain', 'rise', 'jump', 'boost', 'positive'] |
| negative = ['plunge', 'crash', 'fall', 'miss', 'downgrade', 'bearish', |
| 'loss', 'drop', 'slide', 'concern', 'negative'] |
|
|
| pos_count = sum(1 for word in positive if word in text_lower) |
| neg_count = sum(1 for word in negative if word in text_lower) |
|
|
| if pos_count > neg_count: |
| return 'positive' |
| elif neg_count > pos_count: |
| return 'negative' |
| return 'neutral' |
|
|
| def _assess_impact(self, source_weight: float, title: str) -> str: |
| """Assess market impact""" |
| |
| if source_weight >= 1.7: |
| return 'high' |
|
|
| |
| high_impact_words = ['breaking', 'alert', 'emergency', 'crash', 'surge', 'fed'] |
| if any(word in title.lower() for word in high_impact_words): |
| return 'high' |
|
|
| return 'medium' if source_weight >= 1.3 else 'low' |
|
|
| def _detect_breaking_news(self, text: str) -> bool: |
| """Detect breaking news""" |
| text_upper = text.upper() |
| breaking_signals = ['BREAKING', 'ALERT', 'URGENT', 'JUST IN', 'DEVELOPING'] |
| return any(signal in text_upper for signal in breaking_signals) |
|
|
| def _extract_summary(self, text: str, max_length: int = 150) -> str: |
| """Extract clean summary""" |
| text = re.sub(r'http\S+', '', text) |
| text = text.strip() |
|
|
| if len(text) <= max_length: |
| return text |
| return text[:max_length] + '...' |
|
|
| def _get_mock_news(self) -> List[Dict]: |
| """Mock data fallback""" |
| return [ |
| { |
| 'id': 1, |
| 'title': 'Federal Reserve holds rates steady, signals caution on inflation outlook', |
| 'summary': 'Fed maintains current rate policy', |
| 'source': 'Federal Reserve', |
| 'category': 'macro', |
| 'timestamp': datetime.now() - timedelta(minutes=15), |
| 'sentiment': 'neutral', |
| 'impact': 'high', |
| 'url': 'https://www.federalreserve.gov', |
| 'likes': 0, |
| 'retweets': 0, |
| 'is_breaking': False, |
| 'source_weight': 2.0 |
| }, |
| { |
| 'id': 2, |
| 'title': 'S&P 500 closes at record high as tech stocks rally on strong earnings', |
| 'summary': 'S&P 500 hits record on tech rally', |
| 'source': 'CNBC', |
| 'category': 'markets', |
| 'timestamp': datetime.now() - timedelta(minutes=30), |
| 'sentiment': 'positive', |
| 'impact': 'high', |
| 'url': 'https://www.cnbc.com', |
| 'likes': 0, |
| 'retweets': 0, |
| 'is_breaking': False, |
| 'source_weight': 1.2 |
| }, |
| { |
| 'id': 3, |
| 'title': 'ECB President Lagarde warns of persistent inflation pressures in eurozone', |
| 'summary': 'Lagarde warns on eurozone inflation', |
| 'source': 'European Central Bank', |
| 'category': 'macro', |
| 'timestamp': datetime.now() - timedelta(hours=1), |
| 'sentiment': 'negative', |
| 'impact': 'high', |
| 'url': 'https://www.ecb.europa.eu', |
| 'likes': 0, |
| 'retweets': 0, |
| 'is_breaking': False, |
| 'source_weight': 2.0 |
| } |
| ] |
|
|
| def get_news(self, category: str = 'all', sentiment: str = 'all', |
| impact: str = 'all', refresh: bool = False) -> pd.DataFrame: |
| """Get filtered news with caching""" |
| |
| if refresh or not self.last_fetch or \ |
| (datetime.now() - self.last_fetch).seconds > self.cache_ttl: |
| self.news_cache = self.scrape_news(max_items=100) |
| self.last_fetch = datetime.now() |
|
|
| news = self.news_cache.copy() |
|
|
| |
| if category != 'all': |
| news = [n for n in news if n['category'] == category] |
| if sentiment != 'all': |
| news = [n for n in news if n['sentiment'] == sentiment] |
| if impact != 'all': |
| news = [n for n in news if n['impact'] == impact] |
|
|
| df = pd.DataFrame(news) |
| if not df.empty: |
| df['timestamp'] = pd.to_datetime(df['timestamp']) |
|
|
| return df |
|
|
| def get_breaking_news(self) -> pd.DataFrame: |
| """Get breaking/high-impact news""" |
| return self.get_news(impact='high') |
|
|
| def get_statistics(self) -> Dict: |
| """ |
| Get feed statistics |
| Note: Statistics are now managed by NewsCacheManager |
| This method returns empty stats for backward compatibility |
| """ |
| return { |
| 'total': 0, |
| 'high_impact': 0, |
| 'breaking': 0, |
| 'last_update': 'Managed by cache', |
| 'by_category': {} |
| } |
|
|