finbert_anaylzer / data /scraper.py
Jitendra12421's picture
Upload 11 files
34efb38 verified
import asyncio
import aiohttp
import xml.etree.ElementTree as ET
import ssl
import re
from datetime import datetime
from email.utils import parsedate_to_datetime
import glob
import os
class NewsScraper:
def __init__(self, limit=600):
self.limit = limit
self.ssl_context = ssl.create_default_context()
self.ssl_context.check_hostname = False
self.ssl_context.verify_mode = ssl.CERT_NONE
async def fetch_feed(self, session, url):
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=8)) as response:
if response.status == 200:
return await response.text()
except:
pass
return ""
def parse_feed(self, xml_text, lookback_date):
articles = []
if not xml_text:
return articles
try:
lb_date = lookback_date.date()
root = ET.fromstring(xml_text)
for item in root.findall('.//item'):
title = item.findtext('title')
link = item.findtext('link')
pub_date_str = item.findtext('pubDate')
if title and link and pub_date_str:
try:
pub_dt = parsedate_to_datetime(pub_date_str)
if pub_dt.date() >= lb_date:
articles.append({
'title': title, 'link': link,
'pub_date': pub_date_str, 'timestamp': pub_dt.isoformat()
})
except:
pass
except:
pass
return articles
def _build_queries(self, ticker):
"""Generate a massive, diverse set of search queries to maximize article yield."""
t = ticker
base = [
t, f"{t} stock", f"{t} news", f"{t} market", f"{t} earnings",
f"{t} analyst", f"{t} forecast", f"{t} price target",
f"{t} options", f"{t} technical", f"{t} dividend",
f"{t} industry", f"{t} competitor", f"{t} share price",
f"{t} hedge fund", f"{t} institutional",
]
# Financial action queries
actions = [
f"{t} buy sell hold", f"{t} upgrade downgrade", f"{t} outperform underperform",
f"{t} bullish bearish", f"{t} momentum", f"{t} breakout breakdown",
f"{t} rally crash", f"{t} surge plunge", f"{t} soar tumble",
f"{t} gains losses", f"{t} beat miss expectations",
]
# Corporate event queries
events = [
f"{t} CEO news", f"{t} quarterly results", f"{t} revenue profit",
f"{t} guidance outlook", f"{t} acquisition merger",
f"{t} lawsuit legal SEC", f"{t} insider trading",
f"{t} IPO offering", f"{t} buyback repurchase",
f"{t} partnership deal", f"{t} product launch",
f"{t} layoffs restructuring", f"{t} expansion growth",
]
# Analyst and research queries
research = [
f"{t} wall street", f"{t} Goldman Sachs", f"{t} Morgan Stanley",
f"{t} JP Morgan", f"{t} analyst rating", f"{t} price prediction",
f"{t} short interest", f"{t} short squeeze",
f"{t} put call ratio", f"{t} unusual activity",
f"{t} fund holdings", f"{t} 13F filing",
]
# Sector and macro queries
macro = [
f"{t} sector outlook", f"{t} industry trend", f"{t} supply chain",
f"{t} regulation policy", f"{t} inflation impact",
f"{t} interest rate", f"{t} trade war tariff",
f"{t} innovation technology", f"{t} ESG sustainability",
]
# Time-sensitive queries
time_q = [
f"{t} today", f"{t} this week", f"{t} latest",
f"{t} breaking news", f"{t} update",
f"{t} premarket", f"{t} after hours",
]
all_queries = base + actions + events + research + macro + time_q
return all_queries
async def scrape(self, ticker, lookback_date, progress_cb=None):
queries = self._build_queries(ticker)
total_queries = len(queries)
all_articles = []
seen = set()
# Batch fetch: fire all requests concurrently for speed
connector = aiohttp.TCPConnector(limit=50, ssl=self.ssl_context)
async with aiohttp.ClientSession(connector=connector) as session:
# Build all URLs
urls = []
for q in queries:
encoded = q.replace(' ', '+')
urls.append((q, f"https://news.google.com/rss/search?q={encoded}&hl=en-US&gl=US&ceid=US:en"))
# Also add general financial news feeds to pad the count
general_feeds = [
"https://news.google.com/rss/headlines/section/topic/BUSINESS?hl=en-US&gl=US&ceid=US:en",
"https://news.google.com/rss/search?q=stock+market&hl=en-US&gl=US&ceid=US:en",
"https://news.google.com/rss/search?q=wall+street+today&hl=en-US&gl=US&ceid=US:en",
"https://news.google.com/rss/search?q=stocks+trading&hl=en-US&gl=US&ceid=US:en",
"https://news.google.com/rss/search?q=financial+markets&hl=en-US&gl=US&ceid=US:en",
]
for gf in general_feeds:
urls.append(("General Market", gf))
# Fire all requests concurrently in batches of 20
batch_size = 20
for batch_start in range(0, len(urls), batch_size):
if len(all_articles) >= self.limit:
break
batch = urls[batch_start:batch_start + batch_size]
tasks = [self.fetch_feed(session, url) for _, url in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
for (query_name, _), xml in zip(batch, results):
if isinstance(xml, Exception) or not xml:
continue
parsed = self.parse_feed(xml, lookback_date)
for a in parsed:
if a['link'] not in seen:
seen.add(a['link'])
all_articles.append(a)
if len(all_articles) >= self.limit:
break
# Report progress
if progress_cb:
scrape_progress = min(len(all_articles) / self.limit, 1.0)
progress_cb(
scrape_progress,
f"Collecting headlines: {len(all_articles)}/{self.limit}"
)
# Small delay between batches to avoid rate limiting
await asyncio.sleep(0.1)
print(f"[Scraper] Total unique articles collected: {len(all_articles)}")
return all_articles[:self.limit]
@staticmethod
def cleanup():
for f in glob.glob("*.csv"):
try:
os.remove(f)
except:
pass