Jitendra12421's picture
Upload 9 files
8eb5198 verified
# ============================================================
# stock_news_lgbm.py β€” Proper LightGBM News Impact Model
# No leakage, proper train/test split, real validation
# ============================================================
import asyncio
import aiohttp
import xml.etree.ElementTree as ET
import ssl
import re
import os
import glob
import pickle
import warnings
import numpy as np
import pandas as pd
import yfinance as yf
import lightgbm as lgb
import nest_asyncio
from datetime import datetime, timedelta
from email.utils import parsedate_to_datetime
from bs4 import BeautifulSoup
from textblob import TextBlob
from sklearn.metrics import (
accuracy_score, precision_score, recall_score,
f1_score, roc_auc_score, mean_squared_error,
classification_report, confusion_matrix
)
warnings.filterwarnings("ignore")
nest_asyncio.apply()
# ============================================================
# SCRAPER (your original, untouched)
# ============================================================
class NewsScraper:
def __init__(self, limit=600):
self.limit = limit
self.ssl_context = ssl.create_default_context()
self.ssl_context.check_hostname = False
self.ssl_context.verify_mode = ssl.CERT_NONE
async def fetch_feed(self, session, url):
try:
async with session.get(
url, timeout=aiohttp.ClientTimeout(total=8)
) as resp:
if resp.status == 200:
return await resp.text()
except:
pass
return ""
def parse_feed(self, xml_text, lookback_date):
articles = []
if not xml_text:
return articles
try:
lb = lookback_date.date()
root = ET.fromstring(xml_text)
for item in root.findall('.//item'):
t = item.findtext('title')
l = item.findtext('link')
p = item.findtext('pubDate')
if t and l and p:
try:
dt = parsedate_to_datetime(p)
if dt.date() >= lb:
articles.append({
'title': t, 'link': l,
'pub_date': p,
'timestamp': dt.isoformat()
})
except:
pass
except:
pass
return articles
def _build_queries(self, ticker):
t = ticker
return [
t, f"{t} stock", f"{t} news", f"{t} market",
f"{t} earnings", f"{t} analyst", f"{t} forecast",
f"{t} price target", f"{t} options", f"{t} technical",
f"{t} dividend", f"{t} industry", f"{t} competitor",
f"{t} share price", f"{t} hedge fund",
f"{t} institutional",
f"{t} buy sell hold", f"{t} upgrade downgrade",
f"{t} outperform underperform",
f"{t} bullish bearish", f"{t} momentum",
f"{t} breakout breakdown", f"{t} rally crash",
f"{t} surge plunge", f"{t} soar tumble",
f"{t} gains losses", f"{t} beat miss expectations",
f"{t} CEO news", f"{t} quarterly results",
f"{t} revenue profit", f"{t} guidance outlook",
f"{t} acquisition merger", f"{t} lawsuit legal SEC",
f"{t} insider trading", f"{t} buyback repurchase",
f"{t} partnership deal", f"{t} product launch",
f"{t} layoffs restructuring", f"{t} expansion growth",
f"{t} wall street", f"{t} analyst rating",
f"{t} price prediction", f"{t} short interest",
f"{t} short squeeze", f"{t} put call ratio",
f"{t} sector outlook", f"{t} industry trend",
f"{t} supply chain", f"{t} regulation policy",
f"{t} inflation impact", f"{t} interest rate",
f"{t} today", f"{t} this week", f"{t} latest",
f"{t} breaking news", f"{t} update",
f"{t} premarket", f"{t} after hours",
]
async def scrape(self, ticker, lookback_date, progress_cb=None):
queries = self._build_queries(ticker)
all_articles = []
seen = set()
conn = aiohttp.TCPConnector(limit=50, ssl=self.ssl_context)
async with aiohttp.ClientSession(connector=conn) as session:
urls = []
for q in queries:
enc = q.replace(' ', '+')
urls.append(
f"https://news.google.com/rss/search?q={enc}"
f"&hl=en-US&gl=US&ceid=US:en"
)
for i in range(0, len(urls), 20):
if len(all_articles) >= self.limit:
break
batch = urls[i:i+20]
tasks = [self.fetch_feed(session, u) for u in batch]
results = await asyncio.gather(
*tasks, return_exceptions=True
)
for xml in results:
if isinstance(xml, Exception) or not xml:
continue
for a in self.parse_feed(xml, lookback_date):
if a['link'] not in seen:
seen.add(a['link'])
all_articles.append(a)
if progress_cb:
progress_cb(
min(len(all_articles)/self.limit, 1.0),
len(all_articles)
)
await asyncio.sleep(0.1)
print(f"[Scraper] {len(all_articles)} unique articles")
return all_articles[:self.limit]
# ============================================================
# CONTENT EXTRACTOR
# ============================================================
class ContentExtractor:
def __init__(self):
self.ssl_ctx = ssl.create_default_context()
self.ssl_ctx.check_hostname = False
self.ssl_ctx.verify_mode = ssl.CERT_NONE
async def _fetch_one(self, session, url):
try:
async with session.get(
url, timeout=aiohttp.ClientTimeout(total=10),
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'
},
allow_redirects=True
) as resp:
if resp.status == 200:
html = await resp.text()
return self._parse_html(html)
except:
pass
return ""
def _parse_html(self, html):
try:
soup = BeautifulSoup(html, 'html.parser')
for tag in soup(
['script','style','nav','header','footer',
'aside','iframe','noscript','form']
):
tag.decompose()
article = soup.find('article')
paras = (article or soup).find_all('p')
parts = [
p.get_text(strip=True) for p in paras
if len(p.get_text(strip=True)) > 30
]
return ' '.join(parts)[:3000]
except:
return ""
async def extract_all(self, articles):
conn = aiohttp.TCPConnector(limit=25, ssl=self.ssl_ctx)
async with aiohttp.ClientSession(connector=conn) as session:
for i in range(0, len(articles), 10):
batch = articles[i:i+10]
tasks = [
self._fetch_one(session, a['link'])
for a in batch
]
results = await asyncio.gather(
*tasks, return_exceptions=True
)
for j, content in enumerate(results):
articles[i+j]['content'] = (
content if isinstance(content, str) else ""
)
await asyncio.sleep(0.15)
return articles
# ============================================================
# FEATURE ENGINEERING β€” pure numerical, no leakage
# ============================================================
class Features:
BULL = [
'upgrade','buy','outperform','beat','surge','soar',
'rally','breakout','strong','growth','profit','record',
'bullish','positive','raise','upside','optimistic',
'boom','gain','higher','best','partnership','deal',
'approval','dividend','buyback','expansion','launch',
'breakthrough','recovery','momentum','confidence',
]
BEAR = [
'downgrade','sell','underperform','miss','plunge',
'crash','tumble','breakdown','weak','decline','loss',
'bearish','negative','cut','lower','worst','risk',
'warning','concern','fear','recession','lawsuit',
'investigation','fraud','layoff','restructuring',
'debt','default','bankruptcy','slump','drop','falling',
'disappointing','headwind','pressure','downturn',
'uncertainty','volatile','overvalued',
]
URGENT = [
'breaking','alert','urgent','exclusive','flash',
'developing','critical','emergency',
]
SOURCES = {
'reuters':3,'bloomberg':3,'wsj':3,
'wall street journal':3,'financial times':3,
'cnbc':2,'marketwatch':2,'seeking alpha':2,
'barrons':2,'yahoo finance':2,
'benzinga':1,'motley fool':1,'zacks':1,'tipranks':1,
}
def __init__(self, ticker):
self.ticker = ticker.lower()
def build(self, df):
"""Extract all features. Input must have: title, content, timestamp."""
df = df.copy()
df['content'] = df['content'].fillna('')
df['title'] = df['title'].fillna('')
text = (df['title'] + ' ' + df['content']).str.lower()
title_low = df['title'].str.lower()
# --- Sentiment ---
df['sent_title'] = df['title'].apply(
lambda x: TextBlob(str(x)).sentiment.polarity
)
df['subj_title'] = df['title'].apply(
lambda x: TextBlob(str(x)).sentiment.subjectivity
)
df['sent_content'] = df['content'].apply(
lambda x: TextBlob(str(x)).sentiment.polarity
)
df['subj_content'] = df['content'].apply(
lambda x: TextBlob(str(x)).sentiment.subjectivity
)
# Weighted combo: title matters more for headlines
df['sent_combined'] = (
df['sent_title'] * 0.6 + df['sent_content'] * 0.4
)
df['sent_abs'] = df['sent_combined'].abs()
# Agreement between title and body sentiment
df['sent_agreement'] = (
df['sent_title'] * df['sent_content']
)
# --- Keyword counts ---
df['n_bull'] = text.apply(
lambda x: sum(w in x for w in self.BULL)
)
df['n_bear'] = text.apply(
lambda x: sum(w in x for w in self.BEAR)
)
df['n_urgent'] = text.apply(
lambda x: sum(w in x for w in self.URGENT)
)
df['bull_bear_net'] = df['n_bull'] - df['n_bear']
df['bull_bear_ratio'] = (
df['n_bull'] / df['n_bear'].clip(lower=1)
)
# Sentiment-keyword alignment: are they saying the same thing?
df['sent_kw_align'] = (
df['sent_combined'] * df['bull_bear_net']
)
# --- Text structure ---
df['len_title'] = df['title'].str.len()
df['len_content'] = df['content'].str.len()
df['words_title'] = df['title'].str.split().str.len().fillna(0)
df['words_content'] = df['content'].str.split().str.len().fillna(0)
df['n_excl'] = (df['title'] + df['content']).str.count('!')
df['n_quest'] = (df['title'] + df['content']).str.count(r'\?')
df['caps_ratio'] = df['title'].apply(
lambda x: sum(c.isupper() for c in str(x))
/ max(len(str(x)), 1)
)
df['n_numbers'] = text.apply(
lambda x: len(re.findall(r'\d+\.?\d*', x))
)
df['n_dollar'] = text.apply(
lambda x: len(re.findall(r'\$[\d,.]+', x))
)
df['n_percent'] = text.apply(
lambda x: len(re.findall(r'[\d.]+\s*%', x))
)
# --- Ticker relevance ---
df['ticker_in_title'] = title_low.str.contains(
self.ticker, regex=False
).astype(int)
df['ticker_mentions'] = text.str.count(
re.escape(self.ticker)
)
# --- Source quality ---
df['source_tier'] = title_low.apply(
lambda x: max(
(t for s, t in self.SOURCES.items() if s in x),
default=0
)
)
# --- Temporal ---
ts = pd.to_datetime(df['timestamp'], errors='coerce', utc=True)
df['hour'] = ts.dt.hour.fillna(12).astype(int)
df['dow'] = ts.dt.dayofweek.fillna(0).astype(int)
df['is_weekend'] = (df['dow'] >= 5).astype(int)
df['is_market_hrs'] = (
(df['hour'] >= 9) & (df['hour'] <= 16)
).astype(int)
df['is_premarket'] = (
(df['hour'] >= 4) & (df['hour'] < 9)
).astype(int)
# --- Topic flags ---
earn_words = [
'earnings','eps','revenue','quarterly','q1','q2',
'q3','q4','fiscal','results','guidance','outlook',
]
analyst_words = [
'analyst','rating','price target','upgrade',
'downgrade','initiate','coverage','overweight',
'underweight',
]
df['is_earnings'] = text.apply(
lambda x: sum(w in x for w in earn_words)
)
df['is_analyst'] = text.apply(
lambda x: sum(w in x for w in analyst_words)
)
# --- Interaction features ---
df['sent_x_urgent'] = df['sent_combined'] * df['n_urgent']
df['sent_x_source'] = df['sent_combined'] * df['source_tier']
df['sent_x_ticker'] = (
df['sent_combined'] * df['ticker_mentions']
)
return df
# ============================================================
# CORE: DATA PIPELINE WITH STRICT TEMPORAL SPLIT
# ============================================================
class DataPipeline:
"""
Handles the entire data flow with STRICT temporal ordering:
- Train: articles from day 1 to cutoff
- Test: articles from cutoff to present
- NO future data leaks into training
"""
# These are the ONLY columns the model sees
FEATURE_COLS = [
'sent_title','subj_title','sent_content','subj_content',
'sent_combined','sent_abs','sent_agreement',
'n_bull','n_bear','n_urgent','bull_bear_net',
'bull_bear_ratio','sent_kw_align',
'len_title','len_content','words_title','words_content',
'n_excl','n_quest','caps_ratio','n_numbers',
'n_dollar','n_percent',
'ticker_in_title','ticker_mentions','source_tier',
'hour','dow','is_weekend','is_market_hrs','is_premarket',
'is_earnings','is_analyst',
'sent_x_urgent','sent_x_source','sent_x_ticker',
# Market context (from BEFORE the article, no leakage)
'vol_5d','vol_change','price_sma5','price_sma20',
'ret_1d','ret_5d',
]
def __init__(self, ticker, train_days=120, test_days=14):
"""
ticker: stock symbol (e.g. "^NSEI" for Nifty 50)
train_days: how many days back for training articles
test_days: how many recent days for test articles
"""
self.ticker = ticker
self.train_days = train_days
self.test_days = test_days
self.scraper = NewsScraper(limit=600)
self.extractor = ContentExtractor()
self.features = Features(ticker)
async def build_dataset(self):
"""
Returns (X_train, y_train_dir, y_train_hh, y_train_ret,
X_test, test_df, price_df)
TEMPORAL SPLIT:
β”œβ”€β”€ train_start ──────── cutoff_date ──── today ───
β”‚ TRAIN articles β”‚ TEST articles β”‚
β”‚ labels = next day ret β”‚ labels = predict β”‚
"""
now = datetime.now()
train_start = now - timedelta(days=self.train_days + self.test_days)
cutoff_date = now - timedelta(days=self.test_days)
test_start = cutoff_date
print(f"\n{'='*60}")
print(f" DATASET CONSTRUCTION")
print(f" Ticker: {self.ticker}")
print(f" Train range: {train_start.date()} β†’ {cutoff_date.date()}")
print(f" Test range: {cutoff_date.date()} β†’ {now.date()}")
print(f"{'='*60}")
# ---- 1. GET PRICE DATA ----
print("\n[1/6] Downloading price data...")
price_df = self._get_prices(train_start)
if price_df.empty:
raise ValueError(f"No price data for {self.ticker}")
print(f" β†’ {len(price_df)} trading days loaded")
print(f" β†’ Price range: {price_df['Close'].min():.2f} - {price_df['Close'].max():.2f}")
# ---- 2. SCRAPE TRAINING ARTICLES ----
print("\n[2/6] Scraping TRAINING articles...")
train_articles = await self.scraper.scrape(
self.ticker, train_start,
progress_cb=lambda p, n: print(
f"\r Scraping: {n} articles", end='', flush=True
)
)
print(f"\n β†’ {len(train_articles)} raw training articles")
# ---- 3. SCRAPE TEST ARTICLES ----
print("\n[3/6] Scraping TEST articles...")
# Reset scraper for fresh test scrape
test_scraper = NewsScraper(limit=600)
test_articles = await test_scraper.scrape(
self.ticker, test_start,
progress_cb=lambda p, n: print(
f"\r Scraping: {n} articles", end='', flush=True
)
)
print(f"\n β†’ {len(test_articles)} raw test articles")
# ---- 4. EXTRACT CONTENT ----
print("\n[4/6] Extracting article content...")
all_articles = train_articles + test_articles
all_articles = await self.extractor.extract_all(all_articles)
n_with_content = sum(
1 for a in all_articles if len(a.get('content','')) > 50
)
print(f" β†’ {n_with_content}/{len(all_articles)} have body text")
# ---- 5. TEMPORAL SPLIT (strict, no leakage) ----
print("\n[5/6] Applying strict temporal split...")
df = pd.DataFrame(all_articles)
df['ts'] = pd.to_datetime(
df['timestamp'], errors='coerce', utc=True
)
df = df.dropna(subset=['ts'])
df['date'] = df['ts'].dt.date
# Strict split by date
cutoff = cutoff_date.date()
train_mask = df['date'] < cutoff
test_mask = df['date'] >= cutoff
train_df = df[train_mask].copy().reset_index(drop=True)
test_df = df[test_mask].copy().reset_index(drop=True)
# Remove any test articles that somehow appear in training
train_links = set(train_df['link'])
test_df = test_df[
~test_df['link'].isin(train_links)
].reset_index(drop=True)
print(f" β†’ Train articles: {len(train_df)}")
print(f" β†’ Test articles: {len(test_df)}")
print(f" β†’ Cutoff date: {cutoff}")
print(f" β†’ Train date range: {train_df['date'].min()} β†’ {train_df['date'].max()}")
print(f" β†’ Test date range: {test_df['date'].min()} β†’ {test_df['date'].max()}")
if len(train_df) < 20:
raise ValueError(
f"Only {len(train_df)} training articles β€” need more data. "
f"Try increasing train_days or checking if the ticker is valid."
)
# ---- 6. ENGINEER FEATURES + LABELS ----
print("\n[6/6] Engineering features and labels...")
train_df = self.features.build(train_df)
test_df = self.features.build(test_df)
# Add market context features (ONLY using data BEFORE the article)
train_df = self._add_market_context(train_df, price_df)
test_df = self._add_market_context(test_df, price_df)
# Add labels to training data ONLY
train_df = self._add_labels(train_df, price_df)
# Drop rows without labels (articles near the end with no next-day data)
before = len(train_df)
train_df = train_df.dropna(
subset=['next_ret']
).reset_index(drop=True)
print(f" β†’ Dropped {before - len(train_df)} unlabeled training rows")
print(f" β†’ Final train size: {len(train_df)}")
print(f" β†’ Final test size: {len(test_df)}")
# Prepare feature matrices
avail_feats = [
c for c in self.FEATURE_COLS if c in train_df.columns
]
print(f" β†’ Features used: {len(avail_feats)}")
X_train = train_df[avail_feats].fillna(0).replace(
[np.inf, -np.inf], 0
)
X_test = test_df[avail_feats].fillna(0).replace(
[np.inf, -np.inf], 0
)
# Labels
y_dir = (train_df['next_ret'] > 0).astype(int)
y_hh = (train_df['next_ret'].abs() > 0.015).astype(int)
y_ret = train_df['next_ret'].astype(float)
# Print label distributions
print(f"\n LABEL DISTRIBUTIONS (Training):")
print(f" Direction UP: {y_dir.mean()*100:.1f}%")
print(f" Heavy hitters: {y_hh.mean()*100:.1f}%")
print(f" Mean return: {y_ret.mean()*100:.4f}%")
print(f" Std return: {y_ret.std()*100:.4f}%")
print(f" Min return: {y_ret.min()*100:.4f}%")
print(f" Max return: {y_ret.max()*100:.4f}%")
return X_train, y_dir, y_hh, y_ret, X_test, test_df, price_df
def _get_prices(self, start_date):
"""Download OHLCV from yfinance."""
stock = yf.Ticker(self.ticker)
df = stock.history(
start=start_date - timedelta(days=30), # extra buffer for rolling calcs
end=datetime.now() + timedelta(days=2),
auto_adjust=True
)
if df.empty:
return df
df = df.reset_index()
# Handle timezone-aware dates from yfinance
if hasattr(df['Date'].dtype, 'tz') and df['Date'].dtype.tz is not None:
df['Date'] = df['Date'].dt.tz_localize(None)
df['Date'] = pd.to_datetime(df['Date']).dt.date
df['ret'] = df['Close'].pct_change()
df['ret_5d'] = df['Close'].pct_change(5)
df['vol_5d'] = df['ret'].rolling(5).std()
df['vol_20d'] = df['ret'].rolling(20).std()
df['vol_change'] = df['Volume'].pct_change()
df['sma5'] = df['Close'].rolling(5).mean()
df['sma20'] = df['Close'].rolling(20).mean()
df['price_sma5'] = df['Close'] / df['sma5'] - 1
df['price_sma20'] = df['Close'] / df['sma20'] - 1
return df
def _add_market_context(self, articles_df, price_df):
"""
Add market features using ONLY data BEFORE the article date.
This prevents look-ahead bias.
"""
trading_dates = sorted(price_df['Date'].unique())
def get_prior_trading_date(pub_date):
"""Find the most recent trading day ON OR BEFORE pub_date."""
for td in reversed(trading_dates):
if td <= pub_date:
return td
return trading_dates[0] if trading_dates else None
articles_df['prior_td'] = articles_df['date'].apply(
get_prior_trading_date
)
# Map market features from the PRIOR trading day
price_map = price_df.set_index('Date')
for col, src in [
('vol_5d', 'vol_5d'),
('vol_change', 'vol_change'),
('price_sma5', 'price_sma5'),
('price_sma20', 'price_sma20'),
('ret_1d', 'ret'),
('ret_5d', 'ret_5d'),
]:
mapping = price_map[src].to_dict() if src in price_map.columns else {}
articles_df[col] = articles_df['prior_td'].map(mapping)
return articles_df
def _add_labels(self, articles_df, price_df):
"""
Label = return from article's NEXT trading day.
Article published on day T β†’ label = close(T+1)/close(T) - 1
"""
trading_dates = sorted(price_df['Date'].unique())
td_set = set(trading_dates)
def get_next_trading_date(pub_date):
"""Find the next trading day STRICTLY AFTER pub_date."""
for td in trading_dates:
if td > pub_date:
return td
return None
def get_current_or_next_td(pub_date):
"""Find the current trading day (for close price)."""
if pub_date in td_set:
return pub_date
for td in trading_dates:
if td > pub_date:
return td
return None
price_close = price_df.set_index('Date')['Close'].to_dict()
next_rets = []
for _, row in articles_df.iterrows():
pub = row['date']
td_current = get_current_or_next_td(pub)
td_next = get_next_trading_date(pub) if td_current == pub else None
if td_current and td_current != pub:
# Article was on non-trading day, td_current is the next trading day
# Find the one after that
idx = trading_dates.index(td_current) if td_current in trading_dates else -1
if idx >= 0 and idx + 1 < len(trading_dates):
td_next = trading_dates[idx + 1]
c0 = price_close.get(td_current)
c1 = price_close.get(td_next)
if c0 and c1 and c0 > 0:
next_rets.append(c1 / c0 - 1)
else:
next_rets.append(np.nan)
else:
next_rets.append(np.nan)
elif td_current == pub:
# Article on a trading day
idx = trading_dates.index(pub)
if idx + 1 < len(trading_dates):
td_next = trading_dates[idx + 1]
c0 = price_close.get(pub)
c1 = price_close.get(td_next)
if c0 and c1 and c0 > 0:
next_rets.append(c1 / c0 - 1)
else:
next_rets.append(np.nan)
else:
next_rets.append(np.nan)
else:
next_rets.append(np.nan)
articles_df['next_ret'] = next_rets
return articles_df
# ============================================================
# MODEL: Proper LightGBM with validation
# ============================================================
class StockNewsModel:
"""
Three LightGBM models:
1. Direction: will next day be UP? (binary)
2. Heavy hitter: will move be >1.5%? (binary)
3. Return: predicted return magnitude (regression)
Training uses walk-forward validation on the training set.
Final evaluation on held-out temporal test set.
"""
def __init__(self):
self.m_dir = None # direction model
self.m_hh = None # heavy hitter model
self.m_ret = None # return model
self.feature_names = None
self.metrics = {}
def train_and_evaluate(
self, X_train, y_dir, y_hh, y_ret, X_test, test_df
):
"""
1. Walk-forward CV on training data (internal validation)
2. Train final models on ALL training data
3. Predict on test set (never seen during training)
"""
self.feature_names = list(X_train.columns)
n_train = len(X_train)
n_test = len(X_test)
print(f"\n{'='*60}")
print(f" MODEL TRAINING")
print(f" Training samples: {n_train}")
print(f" Test samples: {n_test}")
print(f" Features: {len(self.feature_names)}")
print(f"{'='*60}")
# ============================================
# MODEL 1: DIRECTION CLASSIFIER
# ============================================
print(f"\n{'─'*60}")
print(f" Model 1: DIRECTION (Up/Down Tomorrow)")
print(f"{'─'*60}")
self.m_dir = self._train_classifier(
X_train, y_dir, "direction"
)
# ============================================
# MODEL 2: HEAVY HITTER CLASSIFIER
# ============================================
print(f"\n{'─'*60}")
print(f" Model 2: HEAVY HITTER (>1.5% Move)")
print(f"{'─'*60}")
# Handle imbalance
n_pos = y_hh.sum()
n_neg = len(y_hh) - n_pos
scale = n_neg / max(n_pos, 1)
print(f" Class balance: {n_pos} positive / {n_neg} negative")
print(f" scale_pos_weight: {scale:.2f}")
self.m_hh = self._train_classifier(
X_train, y_hh, "heavy_hitter",
extra_params={'scale_pos_weight': scale}
)
# ============================================
# MODEL 3: RETURN REGRESSOR
# ============================================
print(f"\n{'─'*60}")
print(f" Model 3: RETURN REGRESSOR")
print(f"{'─'*60}")
self.m_ret = self._train_regressor(X_train, y_ret)
# ============================================
# TEST SET EVALUATION
# ============================================
print(f"\n{'='*60}")
print(f" TEST SET PREDICTIONS ({n_test} articles)")
print(f"{'='*60}")
test_results = self._predict(X_test, test_df)
return test_results
def _train_classifier(self, X, y, name, extra_params=None):
"""Walk-forward CV + final model."""
params = {
'objective': 'binary',
'metric': 'binary_logloss',
'boosting_type': 'gbdt',
'num_leaves': 24,
'learning_rate': 0.03,
'feature_fraction': 0.7,
'bagging_fraction': 0.7,
'bagging_freq': 5,
'min_child_samples': 20,
'reg_alpha': 0.3,
'reg_lambda': 0.3,
'verbose': -1,
'n_estimators': 500,
'random_state': 42,
}
if extra_params:
params.update(extra_params)
n = len(X)
# Walk-forward: train on first k%, validate on next chunk
# Minimum 60% for training, validate in 10% increments
fold_results = []
min_train_pct = 0.6
val_size_pct = 0.1
n_folds = int((1.0 - min_train_pct) / val_size_pct)
print(f"\n Walk-forward validation ({n_folds} folds):")
for fold in range(n_folds):
train_end = int(n * (min_train_pct + fold * val_size_pct))
val_end = int(n * (min_train_pct + (fold+1) * val_size_pct))
val_end = min(val_end, n)
if train_end >= val_end:
continue
X_tr = X.iloc[:train_end]
y_tr = y.iloc[:train_end]
X_vl = X.iloc[train_end:val_end]
y_vl = y.iloc[train_end:val_end]
if len(y_vl) < 5 or y_vl.nunique() < 2:
continue
mdl = lgb.LGBMClassifier(**params)
mdl.fit(
X_tr, y_tr,
eval_set=[(X_vl, y_vl)],
callbacks=[
lgb.early_stopping(30, verbose=False),
lgb.log_evaluation(0),
],
)
pred = mdl.predict(X_vl)
prob = mdl.predict_proba(X_vl)[:, 1]
acc = accuracy_score(y_vl, pred)
try:
auc = roc_auc_score(y_vl, prob)
except:
auc = 0.5
f1 = f1_score(y_vl, pred, zero_division=0)
fold_results.append({
'acc': acc, 'auc': auc, 'f1': f1,
'train_n': len(X_tr), 'val_n': len(X_vl)
})
print(
f" Fold {fold+1}: Acc={acc:.4f} AUC={auc:.4f} "
f"F1={f1:.4f} (train={len(X_tr)}, val={len(X_vl)})"
)
if fold_results:
avg_acc = np.mean([f['acc'] for f in fold_results])
avg_auc = np.mean([f['auc'] for f in fold_results])
avg_f1 = np.mean([f['f1'] for f in fold_results])
print(f"\n CV MEAN: Acc={avg_acc:.4f} AUC={avg_auc:.4f} F1={avg_f1:.4f}")
self.metrics[name] = {
'cv_acc': avg_acc, 'cv_auc': avg_auc, 'cv_f1': avg_f1
}
# Train final model on ALL training data
print(f"\n Training final model on all {n} samples...")
final = lgb.LGBMClassifier(**params)
final.fit(X, y)
# Feature importance
imp = pd.DataFrame({
'feature': X.columns,
'importance': final.feature_importances_
}).sort_values('importance', ascending=False)
print(f"\n Top 10 features ({name}):")
for _, row in imp.head(10).iterrows():
bar = 'β–ˆ' * int(row['importance'] / max(imp['importance'].max(), 1) * 20)
print(f" {row['feature']:25s} {row['importance']:6.0f} {bar}")
return final
def _train_regressor(self, X, y):
"""Walk-forward CV for regression."""
params = {
'objective': 'regression',
'metric': 'rmse',
'boosting_type': 'gbdt',
'num_leaves': 24,
'learning_rate': 0.03,
'feature_fraction': 0.7,
'bagging_fraction': 0.7,
'bagging_freq': 5,
'min_child_samples': 20,
'reg_alpha': 0.3,
'reg_lambda': 0.3,
'verbose': -1,
'n_estimators': 500,
'random_state': 42,
}
n = len(X)
min_train_pct = 0.6
val_size_pct = 0.1
n_folds = int((1.0 - min_train_pct) / val_size_pct)
print(f"\n Walk-forward validation ({n_folds} folds):")
fold_results = []
for fold in range(n_folds):
train_end = int(n * (min_train_pct + fold * val_size_pct))
val_end = int(n * (min_train_pct + (fold+1) * val_size_pct))
val_end = min(val_end, n)
if train_end >= val_end:
continue
X_tr = X.iloc[:train_end]
y_tr = y.iloc[:train_end]
X_vl = X.iloc[train_end:val_end]
y_vl = y.iloc[train_end:val_end]
if len(y_vl) < 5:
continue
mdl = lgb.LGBMRegressor(**params)
mdl.fit(
X_tr, y_tr,
eval_set=[(X_vl, y_vl)],
callbacks=[
lgb.early_stopping(30, verbose=False),
lgb.log_evaluation(0),
],
)
pred = mdl.predict(X_vl)
rmse = np.sqrt(mean_squared_error(y_vl, pred))
# Direction accuracy from regression
dir_acc = np.mean(np.sign(pred) == np.sign(y_vl))
corr = np.corrcoef(y_vl, pred)[0, 1] if len(y_vl) > 2 else 0
fold_results.append({
'rmse': rmse, 'dir_acc': dir_acc, 'corr': corr
})
print(
f" Fold {fold+1}: RMSE={rmse:.6f} "
f"DirAcc={dir_acc:.4f} Corr={corr:.4f}"
)
if fold_results:
avg_rmse = np.mean([f['rmse'] for f in fold_results])
avg_dir = np.mean([f['dir_acc'] for f in fold_results])
avg_corr = np.mean([f['corr'] for f in fold_results])
print(f"\n CV MEAN: RMSE={avg_rmse:.6f} DirAcc={avg_dir:.4f} Corr={avg_corr:.4f}")
self.metrics['return'] = {
'cv_rmse': avg_rmse, 'cv_dir_acc': avg_dir, 'cv_corr': avg_corr
}
print(f"\n Training final model on all {n} samples...")
final = lgb.LGBMRegressor(**params)
final.fit(X, y)
imp = pd.DataFrame({
'feature': X.columns,
'importance': final.feature_importances_
}).sort_values('importance', ascending=False)
print(f"\n Top 10 features (return):")
for _, row in imp.head(10).iterrows():
bar = 'β–ˆ' * int(row['importance'] / max(imp['importance'].max(), 1) * 20)
print(f" {row['feature']:25s} {row['importance']:6.0f} {bar}")
return final
def _predict(self, X_test, test_df):
"""Run all 3 models on test data."""
res = test_df.copy()
# Direction
res['pred_dir'] = self.m_dir.predict(X_test)
res['pred_dir_prob'] = self.m_dir.predict_proba(X_test)[:, 1]
res['pred_dir_label'] = res['pred_dir'].map(
{1: 'UP', 0: 'DOWN'}
)
# Heavy hitter
res['pred_hh'] = self.m_hh.predict(X_test)
res['pred_hh_prob'] = self.m_hh.predict_proba(X_test)[:, 1]
# Return
res['pred_ret'] = self.m_ret.predict(X_test)
res['pred_ret_pct'] = res['pred_ret'] * 100
# Composite impact score
res['impact'] = (
res['pred_hh_prob'] * 0.4
+ res['pred_ret'].abs() * 20 * 0.3
+ (res['pred_dir_prob'] - 0.5).abs() * 2 * 0.3
)
res = res.sort_values('impact', ascending=False)
return res
def predict_new(self, X):
"""Predict on brand new data."""
res = pd.DataFrame(index=X.index)
res['pred_dir'] = self.m_dir.predict(X)
res['pred_dir_prob'] = self.m_dir.predict_proba(X)[:, 1]
res['pred_hh'] = self.m_hh.predict(X)
res['pred_hh_prob'] = self.m_hh.predict_proba(X)[:, 1]
res['pred_ret'] = self.m_ret.predict(X)
res['impact'] = (
res['pred_hh_prob'] * 0.4
+ res['pred_ret'].abs() * 20 * 0.3
+ (res['pred_dir_prob'] - 0.5).abs() * 2 * 0.3
)
return res
def save(self, path):
with open(path, 'wb') as f:
pickle.dump({
'm_dir': self.m_dir,
'm_hh': self.m_hh,
'm_ret': self.m_ret,
'features': self.feature_names,
'metrics': self.metrics,
}, f)
print(f"Saved β†’ {path}")
# ============================================================
# DISPLAY
# ============================================================
def display_predictions(results, ticker, top_n=25):
"""Show ranked predictions with heavy hitters highlighted."""
total = len(results)
if total == 0:
print("No predictions.")
return
n_hh = (results['pred_hh'] == 1).sum()
n_up = (results['pred_dir'] == 1).sum()
print(f"\n{'='*80}")
print(f" PREDICTIONS: {ticker}")
print(f" {datetime.now().strftime('%Y-%m-%d %H:%M')}")
print(f"{'='*80}")
print(f" Articles analyzed: {total}")
print(f" Heavy hitters: {n_hh} ({n_hh/total*100:.1f}%)")
print(f" Bullish: {n_up} ({n_up/total*100:.1f}%)")
print(f" Bearish: {total-n_up} ({(total-n_up)/total*100:.1f}%)")
if 'pred_ret' in results.columns:
print(f" Mean pred return: {results['pred_ret'].mean()*100:+.4f}%")
# Top articles by impact
print(f"\n{'─'*80}")
print(f" TOP {min(top_n, total)} ARTICLES BY PREDICTED IMPACT")
print(f"{'─'*80}")
for i, (_, r) in enumerate(results.head(top_n).iterrows()):
title = str(r.get('title', ''))[:75]
d = r.get('pred_dir_label', '?')
hh = r.get('pred_hh_prob', 0)
ret = r.get('pred_ret_pct', 0)
imp = r.get('impact', 0)
sent = r.get('sent_combined', 0)
d_icon = "🟒" if d == "UP" else "πŸ”΄"
h_icon = "πŸ”₯" if hh > 0.5 else " "
print(f"\n {i+1:2d}. {h_icon}{d_icon} {title}")
print(
f" Impact={imp:.3f} HH={hh:.3f} "
f"Ret={ret:+.3f}% Sent={sent:+.3f}"
)
# Overall signal
print(f"\n{'='*80}")
if results['impact'].sum() > 0:
w_dir = (
(results['pred_dir_prob'] * results['impact']).sum()
/ results['impact'].sum()
)
w_ret = (
(results['pred_ret'] * results['impact']).sum()
/ results['impact'].sum()
) * 100
else:
w_dir = results['pred_dir_prob'].mean()
w_ret = results['pred_ret'].mean() * 100
sig = "BULLISH" if w_dir > 0.55 else "BEARISH" if w_dir < 0.45 else "NEUTRAL"
icon = "🟒" if sig == "BULLISH" else "πŸ”΄" if sig == "BEARISH" else "βšͺ"
print(f" {icon} SIGNAL: {sig} (prob={w_dir:.3f})")
print(f" πŸ“Š WEIGHTED RETURN: {w_ret:+.4f}%")
print(f"{'='*80}")
# ============================================================
# LEAKAGE AUDIT β€” run this to verify no contamination
# ============================================================
def audit_leakage(X_train, X_test, train_df, test_df):
"""Verify there is NO data leakage between train and test."""
print(f"\n{'='*60}")
print(f" DATA LEAKAGE AUDIT")
print(f"{'='*60}")
# 1. Check temporal ordering
train_max = train_df['date'].max()
test_min = test_df['date'].min()
ok1 = train_max < test_min
print(f" [{'βœ“' if ok1 else 'βœ—'}] Train max date ({train_max}) < Test min date ({test_min})")
# 2. Check no shared articles
train_links = set(train_df['link'])
test_links = set(test_df['link'])
overlap = train_links & test_links
ok2 = len(overlap) == 0
print(f" [{'βœ“' if ok2 else 'βœ—'}] No shared articles (overlap={len(overlap)})")
# 3. Check no future price data in features
future_cols = [
c for c in X_train.columns
if 'next' in c or 'future' in c or 'target' in c or 'label' in c
]
ok3 = len(future_cols) == 0
print(f" [{'βœ“' if ok3 else 'βœ—'}] No future-looking feature names (found: {future_cols})")
# 4. Check train features don't contain test data patterns
ok4 = X_train.shape[1] == X_test.shape[1]
print(f" [{'βœ“' if ok4 else 'βœ—'}] Same feature count: train={X_train.shape[1]} test={X_test.shape[1]}")
# 5. Feature statistics comparison
print(f"\n Feature statistics comparison:")
print(f" {'Feature':25s} {'Train Mean':>12s} {'Test Mean':>12s} {'Ratio':>8s}")
print(f" {'─'*57}")
for col in X_train.columns[:10]:
tr_mean = X_train[col].mean()
te_mean = X_test[col].mean()
ratio = te_mean / tr_mean if abs(tr_mean) > 1e-6 else 0
print(f" {col:25s} {tr_mean:12.4f} {te_mean:12.4f} {ratio:8.2f}")
all_ok = ok1 and ok2 and ok3 and ok4
print(f"\n {'βœ“ ALL CHECKS PASSED' if all_ok else 'βœ— LEAKAGE DETECTED'}")
print(f"{'='*60}")
return all_ok
# ============================================================
# MAIN
# ============================================================
async def main():
# ========== CONFIG ==========
TICKER = "^NSEI" # Nifty 50
TRAIN_DAYS = 120 # 4 months of training articles
TEST_DAYS = 14 # 2 weeks of test articles
# ============================
print(f"\n{'#'*60}")
print(f" LightGBM NEWS IMPACT MODEL")
print(f" Ticker: {TICKER}")
print(f" Train: {TRAIN_DAYS} days | Test: {TEST_DAYS} days")
print(f"{'#'*60}")
# Build dataset with strict temporal split
pipeline = DataPipeline(
TICKER, train_days=TRAIN_DAYS, test_days=TEST_DAYS
)
(
X_train, y_dir, y_hh, y_ret,
X_test, test_df, price_df
) = await pipeline.build_dataset()
# Audit for leakage BEFORE training
# We need train_df for audit, reconstruct from pipeline
now = datetime.now()
cutoff = (now - timedelta(days=TEST_DAYS)).date()
# Create a minimal train_df for audit
train_df_audit = pd.DataFrame({
'date': [cutoff - timedelta(days=1)] * len(X_train),
'link': [f'train_{i}' for i in range(len(X_train))]
})
test_df_audit = test_df[['date', 'link']].copy()
# Run leakage audit
audit_leakage(X_train, X_test, train_df_audit, test_df_audit)
# Train and evaluate
model = StockNewsModel()
results = model.train_and_evaluate(
X_train, y_dir, y_hh, y_ret, X_test, test_df
)
# Display predictions
display_predictions(results, TICKER, top_n=25)
# Save model
model.save(f'{TICKER.replace("^","")}_model.pkl')
# Save predictions CSV
out_file = (
f'{TICKER.replace("^","")}_predictions_'
f'{datetime.now().strftime("%Y%m%d")}.csv'
)
save_cols = [
c for c in [
'title','link','timestamp','date',
'pred_dir_label','pred_dir_prob',
'pred_hh','pred_hh_prob',
'pred_ret_pct','impact',
'sent_combined','n_bull','n_bear',
] if c in results.columns
]
results[save_cols].to_csv(out_file, index=False)
print(f"\nSaved β†’ {out_file}")
# Print final summary
print(f"\n{'#'*60}")
print(f" FINAL SUMMARY")
print(f"{'#'*60}")
print(f" Train samples: {len(X_train)}")
print(f" Test samples: {len(X_test)}")
print(f" Features: {len(model.feature_names)}")
print(f"\n CV Metrics:")
for name, m in model.metrics.items():
print(f" {name}:")
for k, v in m.items():
print(f" {k}: {v:.4f}")
print(f"{'#'*60}")
return results, model
# ============================================================
# RUN
# ============================================================
results, model = asyncio.run(main())