Misbah
clean up comments and formatting across all modules
4506ba8
# Downloads and preprocesses the raw datasets:
# 1. Financial news articles from HuggingFace → ChromaDB + BM25
# 2. Company fundamentals (synthetic but realistic) → DuckDB
import pandas as pd
from pathlib import Path
from tqdm import tqdm
from src.config import RAW_DATA_DIR, PROCESSED_DATA_DIR
from src.logger import logger
def _try_load_dataset(name, split="train", **kwargs):
# helper to load a HF dataset with fallback if it fails
from datasets import load_dataset
try:
ds = load_dataset(name, split=split, **kwargs)
return ds.to_pandas()
except Exception as e:
logger.warning(f" Could not load {name}: {e}. Skipping.")
return None
def download_financial_news():
# pull financial news from HuggingFace (mix of large + small datasets)
# only falls back to synthetic if downloads fail badly
from datasets import load_dataset
logger.info("Downloading financial news datasets from HuggingFace...")
frames = []
# --- large datasets (these carry the bulk) ---
# dataset 1: ashraq/financial-news-articles (~300k articles, we take first 80k)
# loading via streaming to avoid pulling all 300k into RAM at once
try:
logger.info(" Loading ashraq/financial-news-articles (streaming, capped at 80k)...")
ds = load_dataset("ashraq/financial-news-articles", split="train", streaming=True)
rows = []
for i, item in enumerate(ds):
if i >= 80_000:
break
title = item.get("title") or ""
body = item.get("text") or ""
rows.append({
"text": f"{title}. {body}" if title else body,
"source": "financial_news_articles",
"date": "2023-01-01",
})
df = pd.DataFrame(rows)
frames.append(df)
logger.info(f" financial_news_articles: {len(df)} rows")
del rows # free memory
except Exception as e:
logger.warning(f" Could not load ashraq/financial-news-articles: {e}")
# dataset 2: oliverwang15/news_with_gpt_instructions (~40k news items)
try:
logger.info(" Loading oliverwang15/news_with_gpt_instructions...")
ds = load_dataset("oliverwang15/news_with_gpt_instructions", split="train")
df = ds.to_pandas()
df = df.rename(columns={"news": "text"})[["text"]].copy()
df["source"] = "news_with_gpt"
df["date"] = "2023-06-01"
frames.append(df)
logger.info(f" news_with_gpt: {len(df)} rows")
except Exception as e:
logger.warning(f" Could not load news_with_gpt: {e}")
# --- smaller datasets (add variety in phrasing and topics) ---
# dataset 3: zeroshot/twitter-financial-news-sentiment (~12k)
df = _try_load_dataset("zeroshot/twitter-financial-news-sentiment")
if df is not None and "text" in df.columns:
df = df[["text"]].copy()
df["source"] = "twitter_financial"
df["date"] = "2022-06-01"
frames.append(df)
logger.info(f" twitter_financial_news: {len(df)} rows")
# dataset 4: nickmuchi/financial-classification (~4.5k)
df = _try_load_dataset("nickmuchi/financial-classification")
if df is not None:
text_col = next((c for c in ["text", "sentence", "Sentence"] if c in df.columns), None)
if text_col:
df = df.rename(columns={text_col: "text"})[["text"]].copy()
df["source"] = "financial_classification"
df["date"] = "2022-01-01"
frames.append(df)
logger.info(f" financial_classification: {len(df)} rows")
# combine everything
if frames:
combined = pd.concat(frames, ignore_index=True)
del frames # free memory
else:
combined = pd.DataFrame(columns=["text", "source", "date"])
# basic cleaning first to reduce size before anything else
combined["text"] = combined["text"].astype(str).str.strip()
combined = combined[combined["text"].str.len() > 20] # drop junk
combined = combined.drop_duplicates(subset=["text"])
# truncate articles to ~1000 chars — keeps the key info, massively cuts chunk count
combined["text"] = combined["text"].str[:1000]
total_real = len(combined)
logger.info(f" Total real data rows after cleaning: {total_real}")
# cap at 110k to stay within memory limits
if total_real > 110_000:
combined = combined.sample(n=110_000, random_state=42).reset_index(drop=True)
logger.info(f" Capped to {len(combined)} rows to stay within memory limits")
# only generate synthetic if we somehow got less than 100k real rows
target_rows = 100_000
if total_real < target_rows:
shortfall = target_rows - len(combined)
logger.info(f" Generating {shortfall} synthetic articles to reach {target_rows}...")
synthetic = _generate_synthetic_news(shortfall)
combined = pd.concat([combined, synthetic], ignore_index=True)
out_path = PROCESSED_DATA_DIR / "financial_news.parquet"
combined.to_parquet(out_path, index=False)
logger.info(f"Saved {len(combined)} news articles to {out_path}")
return combined
def _generate_synthetic_news(n_rows: int) -> pd.DataFrame:
# generate template-based financial news snippets with randomized
# companies, numbers, and phrasing for meaningful embedding text
import numpy as np
np.random.seed(42)
companies = [
"Apple", "Microsoft", "Google", "Amazon", "Meta", "NVIDIA", "Tesla",
"Intel", "AMD", "Salesforce", "JPMorgan Chase", "Goldman Sachs",
"Bank of America", "Wells Fargo", "Morgan Stanley", "Johnson & Johnson",
"Pfizer", "UnitedHealth", "AbbVie", "Eli Lilly", "ExxonMobil", "Chevron",
"Walmart", "Procter & Gamble", "Coca-Cola", "Nike", "Netflix",
"Visa", "Mastercard", "Berkshire Hathaway", "Adobe", "Oracle",
"PayPal", "Broadcom", "Costco", "PepsiCo", "Merck", "Thermo Fisher",
"McDonald's", "Starbucks", "Home Depot", "ConocoPhillips", "Target",
"Moderna", "CrowdStrike", "Datadog", "Snowflake", "Palantir", "Uber",
]
sectors = ["Technology", "Healthcare", "Finance", "Energy", "Consumer"]
company_sector = {}
for c in companies:
if c in ["Apple", "Microsoft", "Google", "Amazon", "Meta", "NVIDIA", "Tesla", "Intel", "AMD", "Salesforce", "Adobe", "Oracle", "PayPal", "Broadcom", "Netflix", "CrowdStrike", "Datadog", "Snowflake", "Palantir", "Uber"]:
company_sector[c] = "Technology"
elif c in ["Johnson & Johnson", "Pfizer", "UnitedHealth", "AbbVie", "Eli Lilly", "Merck", "Thermo Fisher", "Moderna"]:
company_sector[c] = "Healthcare"
elif c in ["JPMorgan Chase", "Goldman Sachs", "Bank of America", "Wells Fargo", "Morgan Stanley", "Visa", "Mastercard", "Berkshire Hathaway"]:
company_sector[c] = "Finance"
elif c in ["ExxonMobil", "Chevron", "ConocoPhillips"]:
company_sector[c] = "Energy"
else:
company_sector[c] = "Consumer"
templates = [
"{company} reported quarterly revenue of ${amount}B, {change}% {direction} year-over-year, {beat_miss} analyst expectations.",
"Analysts at {bank} upgraded {company} to {rating}, citing strong {metric} growth and improving margins in the {sector} sector.",
"{company} shares {move} {pct}% after the company announced {event}. The {sector} sector {reaction} on the news.",
"The {sector} sector faces headwinds as {challenge}. {company} and peers are expected to see {impact} in coming quarters.",
"{company}'s CEO stated that {initiative} would drive growth in {year}, with expected revenue impact of ${amount}B.",
"Wall Street is divided on {company}: bulls point to {positive}, while bears worry about {negative}.",
"Q{quarter} earnings season shows {sector} companies {trend}. {company} led the pack with {metric} of ${amount}B.",
"{bank} analysts forecast {company} will achieve ${amount}B in revenue by {year}, driven by {driver}.",
"Investor sentiment toward {company} turned {sentiment} following {catalyst}. Trading volume spiked {pct}%.",
"{company} announced a ${amount}B {action}, signaling confidence in its {sector} market position.",
"Market analysts expect the {sector} sector to {outlook} in {year} as {macro_factor} continues to shape the landscape.",
"{company} expanded its {product} division, investing ${amount}B in {area}. Competitors like {comp2} are watching closely.",
"A recent survey of fund managers shows {pct}% are overweight on {sector} stocks, with {company} as the top pick.",
"Bond yields and {sector} stocks moved in opposite directions as {company} reported mixed results for Q{quarter} {year}.",
"Regulatory concerns weigh on {company} as {regulator} scrutinizes {issue}. The stock dropped {pct}% on the news.",
"{company}'s profit margins improved to {margin}% in {year}, up from {prev_margin}% the prior year, driven by {reason}.",
"Supply chain improvements helped {company} reduce costs by ${amount}M in Q{quarter}, boosting net income expectations.",
"The {sector} sector saw M&A activity surge with {company} acquiring a smaller rival for ${amount}B.",
"{company} declared a dividend increase of {pct}%, reflecting strong cash flow generation in the {sector} space.",
"Institutional investors increased holdings in {company} by {pct}% last quarter, according to SEC filings.",
]
banks = ["Goldman Sachs", "Morgan Stanley", "JPMorgan", "Bank of America", "Citigroup", "Barclays", "UBS", "Credit Suisse", "Deutsche Bank"]
ratings = ["Buy", "Overweight", "Outperform", "Strong Buy", "Hold"]
events = ["a major restructuring plan", "a new product launch", "a strategic acquisition", "better-than-expected earnings", "a stock buyback program", "expansion into AI services", "a partnership with a tech giant"]
challenges = ["rising interest rates pressure valuations", "inflation erodes consumer spending", "regulatory uncertainty increases", "supply chain disruptions persist", "competition intensifies from new entrants"]
positives = ["strong AI revenue growth", "expanding market share", "improving profit margins", "record cash flow generation", "successful product launches"]
negatives = ["slowing user growth", "margin compression", "regulatory headwinds", "increased competition", "high valuation multiples"]
sentiments = ["bullish", "cautious", "bearish", "mixed", "optimistic"]
catalysts = ["strong earnings results", "a surprise CEO change", "new product announcement", "a major contract win", "downbeat guidance for next quarter"]
actions = ["share buyback", "acquisition", "capital investment", "debt refinancing", "strategic investment"]
drivers = ["AI adoption", "cloud migration", "international expansion", "cost optimization", "new product segments"]
areas = ["artificial intelligence", "cloud computing", "cybersecurity", "autonomous vehicles", "biotechnology", "renewable energy"]
regulators = ["the SEC", "the FTC", "the DOJ", "European regulators", "China's market authority"]
issues = ["market practices", "data privacy", "antitrust concerns", "accounting practices", "executive compensation"]
products = ["cloud", "AI", "enterprise", "consumer", "healthcare IT", "fintech", "payments"]
reasons = ["operational efficiency", "AI-driven automation", "scale advantages", "pricing power", "cost restructuring"]
macro_factors = ["Federal Reserve policy", "global trade tensions", "AI investment boom", "consumer spending trends", "energy price volatility"]
outlooks = ["outperform", "underperform", "rally", "face consolidation", "see mixed results"]
trends = ["beating estimates broadly", "showing margin compression", "reporting strong top-line growth", "missing on guidance"]
metrics = ["revenue", "EBITDA", "net income", "free cash flow", "operating profit"]
years = ["2020", "2021", "2022", "2023", "2024"]
rows = []
for i in range(n_rows):
template = templates[i % len(templates)]
company = np.random.choice(companies)
comp2 = np.random.choice([c for c in companies if c != company])
sector = company_sector[company]
text = template.format(
company=company, comp2=comp2, sector=sector,
amount=round(np.random.uniform(0.5, 150), 1),
change=round(np.random.uniform(1, 35), 1),
direction=np.random.choice(["higher", "lower"]),
beat_miss=np.random.choice(["beating", "missing", "meeting"]),
bank=np.random.choice(banks),
rating=np.random.choice(ratings),
metric=np.random.choice(metrics),
move=np.random.choice(["surged", "dropped", "climbed", "fell", "rose"]),
pct=round(np.random.uniform(1, 15), 1),
event=np.random.choice(events),
reaction=np.random.choice(["rallied", "pulled back", "held steady"]),
challenge=np.random.choice(challenges),
impact=np.random.choice(["revenue pressure", "margin expansion", "mixed results"]),
initiative=np.random.choice(["AI integration", "cloud expansion", "cost restructuring", "market expansion"]),
year=np.random.choice(years),
positive=np.random.choice(positives),
negative=np.random.choice(negatives),
sentiment=np.random.choice(sentiments),
catalyst=np.random.choice(catalysts),
action=np.random.choice(actions),
driver=np.random.choice(drivers),
area=np.random.choice(areas),
regulator=np.random.choice(regulators),
issue=np.random.choice(issues),
product=np.random.choice(products),
reason=np.random.choice(reasons),
macro_factor=np.random.choice(macro_factors),
outlook=np.random.choice(outlooks),
trend=np.random.choice(trends),
quarter=np.random.choice([1, 2, 3, 4]),
margin=round(np.random.uniform(8, 35), 1),
prev_margin=round(np.random.uniform(5, 30), 1),
)
month = np.random.randint(1, 13)
year = np.random.choice([2020, 2021, 2022, 2023, 2024])
rows.append({
"text": text,
"source": "synthetic_financial_news",
"date": f"{year}-{month:02d}-01",
})
return pd.DataFrame(rows)
def generate_company_fundamentals():
# generate synthetic but realistic company fundamentals
# covers 105 companies across 5 sectors, 2020–2024 quarterly
import numpy as np
logger.info("Generating company fundamentals dataset...")
np.random.seed(42)
sectors = {
"Technology": {
"companies": [
"Apple", "Microsoft", "Google", "Amazon", "Meta",
"NVIDIA", "Tesla", "Intel", "AMD", "Salesforce",
"Adobe", "Oracle", "IBM", "Netflix", "Cisco",
"PayPal", "Qualcomm", "Broadcom", "ServiceNow", "Intuit",
"Uber", "Snap", "Pinterest", "Dropbox", "Zoom",
"CrowdStrike", "Datadog", "Snowflake", "Palantir", "Twilio",
],
"tickers": [
"AAPL", "MSFT", "GOOGL", "AMZN", "META",
"NVDA", "TSLA", "INTC", "AMD", "CRM",
"ADBE", "ORCL", "IBM", "NFLX", "CSCO",
"PYPL", "QCOM", "AVGO", "NOW", "INTU",
"UBER", "SNAP", "PINS", "DBX", "ZM",
"CRWD", "DDOG", "SNOW", "PLTR", "TWLO",
],
"revenue_range": (5000, 120000),
"growth_rate": 0.08,
},
"Healthcare": {
"companies": [
"Johnson & Johnson", "UnitedHealth", "Pfizer", "AbbVie", "Eli Lilly",
"Merck", "Thermo Fisher", "Abbott Labs", "Danaher", "Amgen",
"Gilead Sciences", "Regeneron", "Vertex", "Moderna", "BioNTech",
"Humana", "Cigna", "CVS Health", "Anthem", "HCA Healthcare",
"Medtronic", "Boston Scientific", "Edwards Lifesciences", "Stryker", "Baxter",
],
"tickers": [
"JNJ", "UNH", "PFE", "ABBV", "LLY",
"MRK", "TMO", "ABT", "DHR", "AMGN",
"GILD", "REGN", "VRTX", "MRNA", "BNTX",
"HUM", "CI", "CVS", "ANTM", "HCA",
"MDT", "BSX", "EW", "SYK", "BAX",
],
"revenue_range": (3000, 90000),
"growth_rate": -0.02,
},
"Finance": {
"companies": [
"JPMorgan Chase", "Bank of America", "Wells Fargo", "Goldman Sachs", "Morgan Stanley",
"Citigroup", "US Bancorp", "Charles Schwab", "BlackRock", "State Street",
"Capital One", "American Express", "Visa", "Mastercard", "Discover",
"Berkshire Hathaway", "MetLife", "Prudential", "AIG", "Aflac",
],
"tickers": [
"JPM", "BAC", "WFC", "GS", "MS",
"C", "USB", "SCHW", "BLK", "STT",
"COF", "AXP", "V", "MA", "DFS",
"BRK.B", "MET", "PRU", "AIG", "AFL",
],
"revenue_range": (8000, 150000),
"growth_rate": 0.04,
},
"Energy": {
"companies": [
"ExxonMobil", "Chevron", "ConocoPhillips", "Schlumberger", "EOG Resources",
"Pioneer Natural", "Devon Energy", "Marathon Petroleum", "Valero", "Phillips 66",
"Occidental Petroleum", "Hess", "Halliburton", "Baker Hughes", "Kinder Morgan",
],
"tickers": [
"XOM", "CVX", "COP", "SLB", "EOG",
"PXD", "DVN", "MPC", "VLO", "PSX",
"OXY", "HES", "HAL", "BKR", "KMI",
],
"revenue_range": (5000, 100000),
"growth_rate": 0.03,
},
"Consumer": {
"companies": [
"Walmart", "Procter & Gamble", "Coca-Cola", "PepsiCo", "Costco",
"Nike", "McDonald's", "Starbucks", "Home Depot", "Lowe's",
"Target", "Dollar General", "Colgate-Palmolive", "Estée Lauder", "Mondelez",
],
"tickers": [
"WMT", "PG", "KO", "PEP", "COST",
"NKE", "MCD", "SBUX", "HD", "LOW",
"TGT", "DG", "CL", "EL", "MDLZ",
],
"revenue_range": (4000, 160000),
"growth_rate": 0.03,
},
}
rows = []
years = range(2020, 2025)
quarters = ["Q1", "Q2", "Q3", "Q4"]
for sector_name, info in sectors.items():
for i, company in enumerate(info["companies"]):
# each company gets a base revenue drawn from the sector range
base_rev = np.random.uniform(*info["revenue_range"])
for year in years:
for qi, quarter in enumerate(quarters):
# apply yearly growth + seasonal variation + noise
year_mult = (1 + info["growth_rate"]) ** (year - 2020)
seasonal = 1.0 + 0.05 * np.sin(2 * np.pi * qi / 4) # Q4 bump etc.
noise = np.random.normal(1.0, 0.05)
revenue = base_rev * year_mult * seasonal * noise
# net income is some fraction of revenue with its own noise
margin = np.random.uniform(0.05, 0.25)
net_income = revenue * margin * np.random.normal(1.0, 0.1)
# total assets ~ 3-8x revenue
total_assets = revenue * np.random.uniform(3, 8)
# market cap ~ 5-20x net income
market_cap = abs(net_income) * np.random.uniform(5, 25)
rows.append({
"company_name": company,
"ticker": info["tickers"][i],
"sector": sector_name,
"year": year,
"quarter": quarter,
"revenue_mn": round(revenue, 2),
"net_income_mn": round(net_income, 2),
"total_assets_mn": round(total_assets, 2),
"market_cap_mn": round(market_cap, 2),
})
df = pd.DataFrame(rows)
out_path = PROCESSED_DATA_DIR / "company_fundamentals.parquet"
df.to_parquet(out_path, index=False)
logger.info(f"Saved {len(df)} company-quarter records to {out_path}")
return df
def run_ingestion():
# main entry point — downloads and preps everything
PROCESSED_DATA_DIR.mkdir(parents=True, exist_ok=True)
news_df = download_financial_news()
companies_df = generate_company_fundamentals()
logger.info(
f"Ingestion complete: {len(news_df)} articles, "
f"{len(companies_df)} company-quarter records"
)
return news_df, companies_df
if __name__ == "__main__":
run_ingestion()