# ============================================================ # stock_news_lgbm.py — Proper LightGBM News Impact Model # No leakage, proper train/test split, real validation # ============================================================ import asyncio import aiohttp import xml.etree.ElementTree as ET import ssl import re import os import glob import pickle import warnings import numpy as np import pandas as pd import yfinance as yf import lightgbm as lgb import nest_asyncio from datetime import datetime, timedelta from email.utils import parsedate_to_datetime from bs4 import BeautifulSoup from textblob import TextBlob from sklearn.metrics import ( accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, mean_squared_error, classification_report, confusion_matrix ) warnings.filterwarnings("ignore") nest_asyncio.apply() # ============================================================ # SCRAPER (your original, untouched) # ============================================================ class NewsScraper: def __init__(self, limit=600): self.limit = limit self.ssl_context = ssl.create_default_context() self.ssl_context.check_hostname = False self.ssl_context.verify_mode = ssl.CERT_NONE async def fetch_feed(self, session, url): try: async with session.get( url, timeout=aiohttp.ClientTimeout(total=8) ) as resp: if resp.status == 200: return await resp.text() except: pass return "" def parse_feed(self, xml_text, lookback_date): articles = [] if not xml_text: return articles try: lb = lookback_date.date() root = ET.fromstring(xml_text) for item in root.findall('.//item'): t = item.findtext('title') l = item.findtext('link') p = item.findtext('pubDate') if t and l and p: try: dt = parsedate_to_datetime(p) if dt.date() >= lb: articles.append({ 'title': t, 'link': l, 'pub_date': p, 'timestamp': dt.isoformat() }) except: pass except: pass return articles def _build_queries(self, ticker): t = ticker return [ t, f"{t} stock", f"{t} news", f"{t} market", f"{t} earnings", f"{t} analyst", f"{t} forecast", f"{t} price target", f"{t} options", f"{t} technical", f"{t} dividend", f"{t} industry", f"{t} competitor", f"{t} share price", f"{t} hedge fund", f"{t} institutional", f"{t} buy sell hold", f"{t} upgrade downgrade", f"{t} outperform underperform", f"{t} bullish bearish", f"{t} momentum", f"{t} breakout breakdown", f"{t} rally crash", f"{t} surge plunge", f"{t} soar tumble", f"{t} gains losses", f"{t} beat miss expectations", f"{t} CEO news", f"{t} quarterly results", f"{t} revenue profit", f"{t} guidance outlook", f"{t} acquisition merger", f"{t} lawsuit legal SEC", f"{t} insider trading", f"{t} buyback repurchase", f"{t} partnership deal", f"{t} product launch", f"{t} layoffs restructuring", f"{t} expansion growth", f"{t} wall street", f"{t} analyst rating", f"{t} price prediction", f"{t} short interest", f"{t} short squeeze", f"{t} put call ratio", f"{t} sector outlook", f"{t} industry trend", f"{t} supply chain", f"{t} regulation policy", f"{t} inflation impact", f"{t} interest rate", f"{t} today", f"{t} this week", f"{t} latest", f"{t} breaking news", f"{t} update", f"{t} premarket", f"{t} after hours", ] async def scrape(self, ticker, lookback_date, progress_cb=None): queries = self._build_queries(ticker) all_articles = [] seen = set() conn = aiohttp.TCPConnector(limit=50, ssl=self.ssl_context) async with aiohttp.ClientSession(connector=conn) as session: urls = [] for q in queries: enc = q.replace(' ', '+') urls.append( f"https://news.google.com/rss/search?q={enc}" f"&hl=en-US&gl=US&ceid=US:en" ) for i in range(0, len(urls), 20): if len(all_articles) >= self.limit: break batch = urls[i:i+20] tasks = [self.fetch_feed(session, u) for u in batch] results = await asyncio.gather( *tasks, return_exceptions=True ) for xml in results: if isinstance(xml, Exception) or not xml: continue for a in self.parse_feed(xml, lookback_date): if a['link'] not in seen: seen.add(a['link']) all_articles.append(a) if progress_cb: progress_cb( min(len(all_articles)/self.limit, 1.0), len(all_articles) ) await asyncio.sleep(0.1) print(f"[Scraper] {len(all_articles)} unique articles") return all_articles[:self.limit] # ============================================================ # CONTENT EXTRACTOR # ============================================================ class ContentExtractor: def __init__(self): self.ssl_ctx = ssl.create_default_context() self.ssl_ctx.check_hostname = False self.ssl_ctx.verify_mode = ssl.CERT_NONE async def _fetch_one(self, session, url): try: async with session.get( url, timeout=aiohttp.ClientTimeout(total=10), headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)' }, allow_redirects=True ) as resp: if resp.status == 200: html = await resp.text() return self._parse_html(html) except: pass return "" def _parse_html(self, html): try: soup = BeautifulSoup(html, 'html.parser') for tag in soup( ['script','style','nav','header','footer', 'aside','iframe','noscript','form'] ): tag.decompose() article = soup.find('article') paras = (article or soup).find_all('p') parts = [ p.get_text(strip=True) for p in paras if len(p.get_text(strip=True)) > 30 ] return ' '.join(parts)[:3000] except: return "" async def extract_all(self, articles): conn = aiohttp.TCPConnector(limit=25, ssl=self.ssl_ctx) async with aiohttp.ClientSession(connector=conn) as session: for i in range(0, len(articles), 10): batch = articles[i:i+10] tasks = [ self._fetch_one(session, a['link']) for a in batch ] results = await asyncio.gather( *tasks, return_exceptions=True ) for j, content in enumerate(results): articles[i+j]['content'] = ( content if isinstance(content, str) else "" ) await asyncio.sleep(0.15) return articles # ============================================================ # FEATURE ENGINEERING — pure numerical, no leakage # ============================================================ class Features: BULL = [ 'upgrade','buy','outperform','beat','surge','soar', 'rally','breakout','strong','growth','profit','record', 'bullish','positive','raise','upside','optimistic', 'boom','gain','higher','best','partnership','deal', 'approval','dividend','buyback','expansion','launch', 'breakthrough','recovery','momentum','confidence', ] BEAR = [ 'downgrade','sell','underperform','miss','plunge', 'crash','tumble','breakdown','weak','decline','loss', 'bearish','negative','cut','lower','worst','risk', 'warning','concern','fear','recession','lawsuit', 'investigation','fraud','layoff','restructuring', 'debt','default','bankruptcy','slump','drop','falling', 'disappointing','headwind','pressure','downturn', 'uncertainty','volatile','overvalued', ] URGENT = [ 'breaking','alert','urgent','exclusive','flash', 'developing','critical','emergency', ] SOURCES = { 'reuters':3,'bloomberg':3,'wsj':3, 'wall street journal':3,'financial times':3, 'cnbc':2,'marketwatch':2,'seeking alpha':2, 'barrons':2,'yahoo finance':2, 'benzinga':1,'motley fool':1,'zacks':1,'tipranks':1, } def __init__(self, ticker): self.ticker = ticker.lower() def build(self, df): """Extract all features. Input must have: title, content, timestamp.""" df = df.copy() df['content'] = df['content'].fillna('') df['title'] = df['title'].fillna('') text = (df['title'] + ' ' + df['content']).str.lower() title_low = df['title'].str.lower() # --- Sentiment --- df['sent_title'] = df['title'].apply( lambda x: TextBlob(str(x)).sentiment.polarity ) df['subj_title'] = df['title'].apply( lambda x: TextBlob(str(x)).sentiment.subjectivity ) df['sent_content'] = df['content'].apply( lambda x: TextBlob(str(x)).sentiment.polarity ) df['subj_content'] = df['content'].apply( lambda x: TextBlob(str(x)).sentiment.subjectivity ) # Weighted combo: title matters more for headlines df['sent_combined'] = ( df['sent_title'] * 0.6 + df['sent_content'] * 0.4 ) df['sent_abs'] = df['sent_combined'].abs() # Agreement between title and body sentiment df['sent_agreement'] = ( df['sent_title'] * df['sent_content'] ) # --- Keyword counts --- df['n_bull'] = text.apply( lambda x: sum(w in x for w in self.BULL) ) df['n_bear'] = text.apply( lambda x: sum(w in x for w in self.BEAR) ) df['n_urgent'] = text.apply( lambda x: sum(w in x for w in self.URGENT) ) df['bull_bear_net'] = df['n_bull'] - df['n_bear'] df['bull_bear_ratio'] = ( df['n_bull'] / df['n_bear'].clip(lower=1) ) # Sentiment-keyword alignment: are they saying the same thing? df['sent_kw_align'] = ( df['sent_combined'] * df['bull_bear_net'] ) # --- Text structure --- df['len_title'] = df['title'].str.len() df['len_content'] = df['content'].str.len() df['words_title'] = df['title'].str.split().str.len().fillna(0) df['words_content'] = df['content'].str.split().str.len().fillna(0) df['n_excl'] = (df['title'] + df['content']).str.count('!') df['n_quest'] = (df['title'] + df['content']).str.count(r'\?') df['caps_ratio'] = df['title'].apply( lambda x: sum(c.isupper() for c in str(x)) / max(len(str(x)), 1) ) df['n_numbers'] = text.apply( lambda x: len(re.findall(r'\d+\.?\d*', x)) ) df['n_dollar'] = text.apply( lambda x: len(re.findall(r'\$[\d,.]+', x)) ) df['n_percent'] = text.apply( lambda x: len(re.findall(r'[\d.]+\s*%', x)) ) # --- Ticker relevance --- df['ticker_in_title'] = title_low.str.contains( self.ticker, regex=False ).astype(int) df['ticker_mentions'] = text.str.count( re.escape(self.ticker) ) # --- Source quality --- df['source_tier'] = title_low.apply( lambda x: max( (t for s, t in self.SOURCES.items() if s in x), default=0 ) ) # --- Temporal --- ts = pd.to_datetime(df['timestamp'], errors='coerce', utc=True) df['hour'] = ts.dt.hour.fillna(12).astype(int) df['dow'] = ts.dt.dayofweek.fillna(0).astype(int) df['is_weekend'] = (df['dow'] >= 5).astype(int) df['is_market_hrs'] = ( (df['hour'] >= 9) & (df['hour'] <= 16) ).astype(int) df['is_premarket'] = ( (df['hour'] >= 4) & (df['hour'] < 9) ).astype(int) # --- Topic flags --- earn_words = [ 'earnings','eps','revenue','quarterly','q1','q2', 'q3','q4','fiscal','results','guidance','outlook', ] analyst_words = [ 'analyst','rating','price target','upgrade', 'downgrade','initiate','coverage','overweight', 'underweight', ] df['is_earnings'] = text.apply( lambda x: sum(w in x for w in earn_words) ) df['is_analyst'] = text.apply( lambda x: sum(w in x for w in analyst_words) ) # --- Interaction features --- df['sent_x_urgent'] = df['sent_combined'] * df['n_urgent'] df['sent_x_source'] = df['sent_combined'] * df['source_tier'] df['sent_x_ticker'] = ( df['sent_combined'] * df['ticker_mentions'] ) return df # ============================================================ # CORE: DATA PIPELINE WITH STRICT TEMPORAL SPLIT # ============================================================ class DataPipeline: """ Handles the entire data flow with STRICT temporal ordering: - Train: articles from day 1 to cutoff - Test: articles from cutoff to present - NO future data leaks into training """ # These are the ONLY columns the model sees FEATURE_COLS = [ 'sent_title','subj_title','sent_content','subj_content', 'sent_combined','sent_abs','sent_agreement', 'n_bull','n_bear','n_urgent','bull_bear_net', 'bull_bear_ratio','sent_kw_align', 'len_title','len_content','words_title','words_content', 'n_excl','n_quest','caps_ratio','n_numbers', 'n_dollar','n_percent', 'ticker_in_title','ticker_mentions','source_tier', 'hour','dow','is_weekend','is_market_hrs','is_premarket', 'is_earnings','is_analyst', 'sent_x_urgent','sent_x_source','sent_x_ticker', # Market context (from BEFORE the article, no leakage) 'vol_5d','vol_change','price_sma5','price_sma20', 'ret_1d','ret_5d', ] def __init__(self, ticker, train_days=120, test_days=14): """ ticker: stock symbol (e.g. "^NSEI" for Nifty 50) train_days: how many days back for training articles test_days: how many recent days for test articles """ self.ticker = ticker self.train_days = train_days self.test_days = test_days self.scraper = NewsScraper(limit=600) self.extractor = ContentExtractor() self.features = Features(ticker) async def build_dataset(self): """ Returns (X_train, y_train_dir, y_train_hh, y_train_ret, X_test, test_df, price_df) TEMPORAL SPLIT: ├── train_start ──────── cutoff_date ──── today ──┤ │ TRAIN articles │ TEST articles │ │ labels = next day ret │ labels = predict │ """ now = datetime.now() train_start = now - timedelta(days=self.train_days + self.test_days) cutoff_date = now - timedelta(days=self.test_days) test_start = cutoff_date print(f"\n{'='*60}") print(f" DATASET CONSTRUCTION") print(f" Ticker: {self.ticker}") print(f" Train range: {train_start.date()} → {cutoff_date.date()}") print(f" Test range: {cutoff_date.date()} → {now.date()}") print(f"{'='*60}") # ---- 1. GET PRICE DATA ---- print("\n[1/6] Downloading price data...") price_df = self._get_prices(train_start) if price_df.empty: raise ValueError(f"No price data for {self.ticker}") print(f" → {len(price_df)} trading days loaded") print(f" → Price range: {price_df['Close'].min():.2f} - {price_df['Close'].max():.2f}") # ---- 2. SCRAPE TRAINING ARTICLES ---- print("\n[2/6] Scraping TRAINING articles...") train_articles = await self.scraper.scrape( self.ticker, train_start, progress_cb=lambda p, n: print( f"\r Scraping: {n} articles", end='', flush=True ) ) print(f"\n → {len(train_articles)} raw training articles") # ---- 3. SCRAPE TEST ARTICLES ---- print("\n[3/6] Scraping TEST articles...") # Reset scraper for fresh test scrape test_scraper = NewsScraper(limit=600) test_articles = await test_scraper.scrape( self.ticker, test_start, progress_cb=lambda p, n: print( f"\r Scraping: {n} articles", end='', flush=True ) ) print(f"\n → {len(test_articles)} raw test articles") # ---- 4. EXTRACT CONTENT ---- print("\n[4/6] Extracting article content...") all_articles = train_articles + test_articles all_articles = await self.extractor.extract_all(all_articles) n_with_content = sum( 1 for a in all_articles if len(a.get('content','')) > 50 ) print(f" → {n_with_content}/{len(all_articles)} have body text") # ---- 5. TEMPORAL SPLIT (strict, no leakage) ---- print("\n[5/6] Applying strict temporal split...") df = pd.DataFrame(all_articles) df['ts'] = pd.to_datetime( df['timestamp'], errors='coerce', utc=True ) df = df.dropna(subset=['ts']) df['date'] = df['ts'].dt.date # Strict split by date cutoff = cutoff_date.date() train_mask = df['date'] < cutoff test_mask = df['date'] >= cutoff train_df = df[train_mask].copy().reset_index(drop=True) test_df = df[test_mask].copy().reset_index(drop=True) # Remove any test articles that somehow appear in training train_links = set(train_df['link']) test_df = test_df[ ~test_df['link'].isin(train_links) ].reset_index(drop=True) print(f" → Train articles: {len(train_df)}") print(f" → Test articles: {len(test_df)}") print(f" → Cutoff date: {cutoff}") print(f" → Train date range: {train_df['date'].min()} → {train_df['date'].max()}") print(f" → Test date range: {test_df['date'].min()} → {test_df['date'].max()}") if len(train_df) < 20: raise ValueError( f"Only {len(train_df)} training articles — need more data. " f"Try increasing train_days or checking if the ticker is valid." ) # ---- 6. ENGINEER FEATURES + LABELS ---- print("\n[6/6] Engineering features and labels...") train_df = self.features.build(train_df) test_df = self.features.build(test_df) # Add market context features (ONLY using data BEFORE the article) train_df = self._add_market_context(train_df, price_df) test_df = self._add_market_context(test_df, price_df) # Add labels to training data ONLY train_df = self._add_labels(train_df, price_df) # Drop rows without labels (articles near the end with no next-day data) before = len(train_df) train_df = train_df.dropna( subset=['next_ret'] ).reset_index(drop=True) print(f" → Dropped {before - len(train_df)} unlabeled training rows") print(f" → Final train size: {len(train_df)}") print(f" → Final test size: {len(test_df)}") # Prepare feature matrices avail_feats = [ c for c in self.FEATURE_COLS if c in train_df.columns ] print(f" → Features used: {len(avail_feats)}") X_train = train_df[avail_feats].fillna(0).replace( [np.inf, -np.inf], 0 ) X_test = test_df[avail_feats].fillna(0).replace( [np.inf, -np.inf], 0 ) # Labels y_dir = (train_df['next_ret'] > 0).astype(int) y_hh = (train_df['next_ret'].abs() > 0.015).astype(int) y_ret = train_df['next_ret'].astype(float) # Print label distributions print(f"\n LABEL DISTRIBUTIONS (Training):") print(f" Direction UP: {y_dir.mean()*100:.1f}%") print(f" Heavy hitters: {y_hh.mean()*100:.1f}%") print(f" Mean return: {y_ret.mean()*100:.4f}%") print(f" Std return: {y_ret.std()*100:.4f}%") print(f" Min return: {y_ret.min()*100:.4f}%") print(f" Max return: {y_ret.max()*100:.4f}%") return X_train, y_dir, y_hh, y_ret, X_test, test_df, price_df def _get_prices(self, start_date): """Download OHLCV from yfinance.""" stock = yf.Ticker(self.ticker) df = stock.history( start=start_date - timedelta(days=30), # extra buffer for rolling calcs end=datetime.now() + timedelta(days=2), auto_adjust=True ) if df.empty: return df df = df.reset_index() # Handle timezone-aware dates from yfinance if hasattr(df['Date'].dtype, 'tz') and df['Date'].dtype.tz is not None: df['Date'] = df['Date'].dt.tz_localize(None) df['Date'] = pd.to_datetime(df['Date']).dt.date df['ret'] = df['Close'].pct_change() df['ret_5d'] = df['Close'].pct_change(5) df['vol_5d'] = df['ret'].rolling(5).std() df['vol_20d'] = df['ret'].rolling(20).std() df['vol_change'] = df['Volume'].pct_change() df['sma5'] = df['Close'].rolling(5).mean() df['sma20'] = df['Close'].rolling(20).mean() df['price_sma5'] = df['Close'] / df['sma5'] - 1 df['price_sma20'] = df['Close'] / df['sma20'] - 1 return df def _add_market_context(self, articles_df, price_df): """ Add market features using ONLY data BEFORE the article date. This prevents look-ahead bias. """ trading_dates = sorted(price_df['Date'].unique()) def get_prior_trading_date(pub_date): """Find the most recent trading day ON OR BEFORE pub_date.""" for td in reversed(trading_dates): if td <= pub_date: return td return trading_dates[0] if trading_dates else None articles_df['prior_td'] = articles_df['date'].apply( get_prior_trading_date ) # Map market features from the PRIOR trading day price_map = price_df.set_index('Date') for col, src in [ ('vol_5d', 'vol_5d'), ('vol_change', 'vol_change'), ('price_sma5', 'price_sma5'), ('price_sma20', 'price_sma20'), ('ret_1d', 'ret'), ('ret_5d', 'ret_5d'), ]: mapping = price_map[src].to_dict() if src in price_map.columns else {} articles_df[col] = articles_df['prior_td'].map(mapping) return articles_df def _add_labels(self, articles_df, price_df): """ Label = return from article's NEXT trading day. Article published on day T → label = close(T+1)/close(T) - 1 """ trading_dates = sorted(price_df['Date'].unique()) td_set = set(trading_dates) def get_next_trading_date(pub_date): """Find the next trading day STRICTLY AFTER pub_date.""" for td in trading_dates: if td > pub_date: return td return None def get_current_or_next_td(pub_date): """Find the current trading day (for close price).""" if pub_date in td_set: return pub_date for td in trading_dates: if td > pub_date: return td return None price_close = price_df.set_index('Date')['Close'].to_dict() next_rets = [] for _, row in articles_df.iterrows(): pub = row['date'] td_current = get_current_or_next_td(pub) td_next = get_next_trading_date(pub) if td_current == pub else None if td_current and td_current != pub: # Article was on non-trading day, td_current is the next trading day # Find the one after that idx = trading_dates.index(td_current) if td_current in trading_dates else -1 if idx >= 0 and idx + 1 < len(trading_dates): td_next = trading_dates[idx + 1] c0 = price_close.get(td_current) c1 = price_close.get(td_next) if c0 and c1 and c0 > 0: next_rets.append(c1 / c0 - 1) else: next_rets.append(np.nan) else: next_rets.append(np.nan) elif td_current == pub: # Article on a trading day idx = trading_dates.index(pub) if idx + 1 < len(trading_dates): td_next = trading_dates[idx + 1] c0 = price_close.get(pub) c1 = price_close.get(td_next) if c0 and c1 and c0 > 0: next_rets.append(c1 / c0 - 1) else: next_rets.append(np.nan) else: next_rets.append(np.nan) else: next_rets.append(np.nan) articles_df['next_ret'] = next_rets return articles_df # ============================================================ # MODEL: Proper LightGBM with validation # ============================================================ class StockNewsModel: """ Three LightGBM models: 1. Direction: will next day be UP? (binary) 2. Heavy hitter: will move be >1.5%? (binary) 3. Return: predicted return magnitude (regression) Training uses walk-forward validation on the training set. Final evaluation on held-out temporal test set. """ def __init__(self): self.m_dir = None # direction model self.m_hh = None # heavy hitter model self.m_ret = None # return model self.feature_names = None self.metrics = {} def train_and_evaluate( self, X_train, y_dir, y_hh, y_ret, X_test, test_df ): """ 1. Walk-forward CV on training data (internal validation) 2. Train final models on ALL training data 3. Predict on test set (never seen during training) """ self.feature_names = list(X_train.columns) n_train = len(X_train) n_test = len(X_test) print(f"\n{'='*60}") print(f" MODEL TRAINING") print(f" Training samples: {n_train}") print(f" Test samples: {n_test}") print(f" Features: {len(self.feature_names)}") print(f"{'='*60}") # ============================================ # MODEL 1: DIRECTION CLASSIFIER # ============================================ print(f"\n{'─'*60}") print(f" Model 1: DIRECTION (Up/Down Tomorrow)") print(f"{'─'*60}") self.m_dir = self._train_classifier( X_train, y_dir, "direction" ) # ============================================ # MODEL 2: HEAVY HITTER CLASSIFIER # ============================================ print(f"\n{'─'*60}") print(f" Model 2: HEAVY HITTER (>1.5% Move)") print(f"{'─'*60}") # Handle imbalance n_pos = y_hh.sum() n_neg = len(y_hh) - n_pos scale = n_neg / max(n_pos, 1) print(f" Class balance: {n_pos} positive / {n_neg} negative") print(f" scale_pos_weight: {scale:.2f}") self.m_hh = self._train_classifier( X_train, y_hh, "heavy_hitter", extra_params={'scale_pos_weight': scale} ) # ============================================ # MODEL 3: RETURN REGRESSOR # ============================================ print(f"\n{'─'*60}") print(f" Model 3: RETURN REGRESSOR") print(f"{'─'*60}") self.m_ret = self._train_regressor(X_train, y_ret) # ============================================ # TEST SET EVALUATION # ============================================ print(f"\n{'='*60}") print(f" TEST SET PREDICTIONS ({n_test} articles)") print(f"{'='*60}") test_results = self._predict(X_test, test_df) return test_results def _train_classifier(self, X, y, name, extra_params=None): """Walk-forward CV + final model.""" params = { 'objective': 'binary', 'metric': 'binary_logloss', 'boosting_type': 'gbdt', 'num_leaves': 24, 'learning_rate': 0.03, 'feature_fraction': 0.7, 'bagging_fraction': 0.7, 'bagging_freq': 5, 'min_child_samples': 20, 'reg_alpha': 0.3, 'reg_lambda': 0.3, 'verbose': -1, 'n_estimators': 500, 'random_state': 42, } if extra_params: params.update(extra_params) n = len(X) # Walk-forward: train on first k%, validate on next chunk # Minimum 60% for training, validate in 10% increments fold_results = [] min_train_pct = 0.6 val_size_pct = 0.1 n_folds = int((1.0 - min_train_pct) / val_size_pct) print(f"\n Walk-forward validation ({n_folds} folds):") for fold in range(n_folds): train_end = int(n * (min_train_pct + fold * val_size_pct)) val_end = int(n * (min_train_pct + (fold+1) * val_size_pct)) val_end = min(val_end, n) if train_end >= val_end: continue X_tr = X.iloc[:train_end] y_tr = y.iloc[:train_end] X_vl = X.iloc[train_end:val_end] y_vl = y.iloc[train_end:val_end] if len(y_vl) < 5 or y_vl.nunique() < 2: continue mdl = lgb.LGBMClassifier(**params) mdl.fit( X_tr, y_tr, eval_set=[(X_vl, y_vl)], callbacks=[ lgb.early_stopping(30, verbose=False), lgb.log_evaluation(0), ], ) pred = mdl.predict(X_vl) prob = mdl.predict_proba(X_vl)[:, 1] acc = accuracy_score(y_vl, pred) try: auc = roc_auc_score(y_vl, prob) except: auc = 0.5 f1 = f1_score(y_vl, pred, zero_division=0) fold_results.append({ 'acc': acc, 'auc': auc, 'f1': f1, 'train_n': len(X_tr), 'val_n': len(X_vl) }) print( f" Fold {fold+1}: Acc={acc:.4f} AUC={auc:.4f} " f"F1={f1:.4f} (train={len(X_tr)}, val={len(X_vl)})" ) if fold_results: avg_acc = np.mean([f['acc'] for f in fold_results]) avg_auc = np.mean([f['auc'] for f in fold_results]) avg_f1 = np.mean([f['f1'] for f in fold_results]) print(f"\n CV MEAN: Acc={avg_acc:.4f} AUC={avg_auc:.4f} F1={avg_f1:.4f}") self.metrics[name] = { 'cv_acc': avg_acc, 'cv_auc': avg_auc, 'cv_f1': avg_f1 } # Train final model on ALL training data print(f"\n Training final model on all {n} samples...") final = lgb.LGBMClassifier(**params) final.fit(X, y) # Feature importance imp = pd.DataFrame({ 'feature': X.columns, 'importance': final.feature_importances_ }).sort_values('importance', ascending=False) print(f"\n Top 10 features ({name}):") for _, row in imp.head(10).iterrows(): bar = '█' * int(row['importance'] / max(imp['importance'].max(), 1) * 20) print(f" {row['feature']:25s} {row['importance']:6.0f} {bar}") return final def _train_regressor(self, X, y): """Walk-forward CV for regression.""" params = { 'objective': 'regression', 'metric': 'rmse', 'boosting_type': 'gbdt', 'num_leaves': 24, 'learning_rate': 0.03, 'feature_fraction': 0.7, 'bagging_fraction': 0.7, 'bagging_freq': 5, 'min_child_samples': 20, 'reg_alpha': 0.3, 'reg_lambda': 0.3, 'verbose': -1, 'n_estimators': 500, 'random_state': 42, } n = len(X) min_train_pct = 0.6 val_size_pct = 0.1 n_folds = int((1.0 - min_train_pct) / val_size_pct) print(f"\n Walk-forward validation ({n_folds} folds):") fold_results = [] for fold in range(n_folds): train_end = int(n * (min_train_pct + fold * val_size_pct)) val_end = int(n * (min_train_pct + (fold+1) * val_size_pct)) val_end = min(val_end, n) if train_end >= val_end: continue X_tr = X.iloc[:train_end] y_tr = y.iloc[:train_end] X_vl = X.iloc[train_end:val_end] y_vl = y.iloc[train_end:val_end] if len(y_vl) < 5: continue mdl = lgb.LGBMRegressor(**params) mdl.fit( X_tr, y_tr, eval_set=[(X_vl, y_vl)], callbacks=[ lgb.early_stopping(30, verbose=False), lgb.log_evaluation(0), ], ) pred = mdl.predict(X_vl) rmse = np.sqrt(mean_squared_error(y_vl, pred)) # Direction accuracy from regression dir_acc = np.mean(np.sign(pred) == np.sign(y_vl)) corr = np.corrcoef(y_vl, pred)[0, 1] if len(y_vl) > 2 else 0 fold_results.append({ 'rmse': rmse, 'dir_acc': dir_acc, 'corr': corr }) print( f" Fold {fold+1}: RMSE={rmse:.6f} " f"DirAcc={dir_acc:.4f} Corr={corr:.4f}" ) if fold_results: avg_rmse = np.mean([f['rmse'] for f in fold_results]) avg_dir = np.mean([f['dir_acc'] for f in fold_results]) avg_corr = np.mean([f['corr'] for f in fold_results]) print(f"\n CV MEAN: RMSE={avg_rmse:.6f} DirAcc={avg_dir:.4f} Corr={avg_corr:.4f}") self.metrics['return'] = { 'cv_rmse': avg_rmse, 'cv_dir_acc': avg_dir, 'cv_corr': avg_corr } print(f"\n Training final model on all {n} samples...") final = lgb.LGBMRegressor(**params) final.fit(X, y) imp = pd.DataFrame({ 'feature': X.columns, 'importance': final.feature_importances_ }).sort_values('importance', ascending=False) print(f"\n Top 10 features (return):") for _, row in imp.head(10).iterrows(): bar = '█' * int(row['importance'] / max(imp['importance'].max(), 1) * 20) print(f" {row['feature']:25s} {row['importance']:6.0f} {bar}") return final def _predict(self, X_test, test_df): """Run all 3 models on test data.""" res = test_df.copy() # Direction res['pred_dir'] = self.m_dir.predict(X_test) res['pred_dir_prob'] = self.m_dir.predict_proba(X_test)[:, 1] res['pred_dir_label'] = res['pred_dir'].map( {1: 'UP', 0: 'DOWN'} ) # Heavy hitter res['pred_hh'] = self.m_hh.predict(X_test) res['pred_hh_prob'] = self.m_hh.predict_proba(X_test)[:, 1] # Return res['pred_ret'] = self.m_ret.predict(X_test) res['pred_ret_pct'] = res['pred_ret'] * 100 # Composite impact score res['impact'] = ( res['pred_hh_prob'] * 0.4 + res['pred_ret'].abs() * 20 * 0.3 + (res['pred_dir_prob'] - 0.5).abs() * 2 * 0.3 ) res = res.sort_values('impact', ascending=False) return res def predict_new(self, X): """Predict on brand new data.""" res = pd.DataFrame(index=X.index) res['pred_dir'] = self.m_dir.predict(X) res['pred_dir_prob'] = self.m_dir.predict_proba(X)[:, 1] res['pred_hh'] = self.m_hh.predict(X) res['pred_hh_prob'] = self.m_hh.predict_proba(X)[:, 1] res['pred_ret'] = self.m_ret.predict(X) res['impact'] = ( res['pred_hh_prob'] * 0.4 + res['pred_ret'].abs() * 20 * 0.3 + (res['pred_dir_prob'] - 0.5).abs() * 2 * 0.3 ) return res def save(self, path): with open(path, 'wb') as f: pickle.dump({ 'm_dir': self.m_dir, 'm_hh': self.m_hh, 'm_ret': self.m_ret, 'features': self.feature_names, 'metrics': self.metrics, }, f) print(f"Saved → {path}") # ============================================================ # DISPLAY # ============================================================ def display_predictions(results, ticker, top_n=25): """Show ranked predictions with heavy hitters highlighted.""" total = len(results) if total == 0: print("No predictions.") return n_hh = (results['pred_hh'] == 1).sum() n_up = (results['pred_dir'] == 1).sum() print(f"\n{'='*80}") print(f" PREDICTIONS: {ticker}") print(f" {datetime.now().strftime('%Y-%m-%d %H:%M')}") print(f"{'='*80}") print(f" Articles analyzed: {total}") print(f" Heavy hitters: {n_hh} ({n_hh/total*100:.1f}%)") print(f" Bullish: {n_up} ({n_up/total*100:.1f}%)") print(f" Bearish: {total-n_up} ({(total-n_up)/total*100:.1f}%)") if 'pred_ret' in results.columns: print(f" Mean pred return: {results['pred_ret'].mean()*100:+.4f}%") # Top articles by impact print(f"\n{'─'*80}") print(f" TOP {min(top_n, total)} ARTICLES BY PREDICTED IMPACT") print(f"{'─'*80}") for i, (_, r) in enumerate(results.head(top_n).iterrows()): title = str(r.get('title', ''))[:75] d = r.get('pred_dir_label', '?') hh = r.get('pred_hh_prob', 0) ret = r.get('pred_ret_pct', 0) imp = r.get('impact', 0) sent = r.get('sent_combined', 0) d_icon = "🟢" if d == "UP" else "🔴" h_icon = "🔥" if hh > 0.5 else " " print(f"\n {i+1:2d}. {h_icon}{d_icon} {title}") print( f" Impact={imp:.3f} HH={hh:.3f} " f"Ret={ret:+.3f}% Sent={sent:+.3f}" ) # Overall signal print(f"\n{'='*80}") if results['impact'].sum() > 0: w_dir = ( (results['pred_dir_prob'] * results['impact']).sum() / results['impact'].sum() ) w_ret = ( (results['pred_ret'] * results['impact']).sum() / results['impact'].sum() ) * 100 else: w_dir = results['pred_dir_prob'].mean() w_ret = results['pred_ret'].mean() * 100 sig = "BULLISH" if w_dir > 0.55 else "BEARISH" if w_dir < 0.45 else "NEUTRAL" icon = "🟢" if sig == "BULLISH" else "🔴" if sig == "BEARISH" else "⚪" print(f" {icon} SIGNAL: {sig} (prob={w_dir:.3f})") print(f" 📊 WEIGHTED RETURN: {w_ret:+.4f}%") print(f"{'='*80}") # ============================================================ # LEAKAGE AUDIT — run this to verify no contamination # ============================================================ def audit_leakage(X_train, X_test, train_df, test_df): """Verify there is NO data leakage between train and test.""" print(f"\n{'='*60}") print(f" DATA LEAKAGE AUDIT") print(f"{'='*60}") # 1. Check temporal ordering train_max = train_df['date'].max() test_min = test_df['date'].min() ok1 = train_max < test_min print(f" [{'✓' if ok1 else '✗'}] Train max date ({train_max}) < Test min date ({test_min})") # 2. Check no shared articles train_links = set(train_df['link']) test_links = set(test_df['link']) overlap = train_links & test_links ok2 = len(overlap) == 0 print(f" [{'✓' if ok2 else '✗'}] No shared articles (overlap={len(overlap)})") # 3. Check no future price data in features future_cols = [ c for c in X_train.columns if 'next' in c or 'future' in c or 'target' in c or 'label' in c ] ok3 = len(future_cols) == 0 print(f" [{'✓' if ok3 else '✗'}] No future-looking feature names (found: {future_cols})") # 4. Check train features don't contain test data patterns ok4 = X_train.shape[1] == X_test.shape[1] print(f" [{'✓' if ok4 else '✗'}] Same feature count: train={X_train.shape[1]} test={X_test.shape[1]}") # 5. Feature statistics comparison print(f"\n Feature statistics comparison:") print(f" {'Feature':25s} {'Train Mean':>12s} {'Test Mean':>12s} {'Ratio':>8s}") print(f" {'─'*57}") for col in X_train.columns[:10]: tr_mean = X_train[col].mean() te_mean = X_test[col].mean() ratio = te_mean / tr_mean if abs(tr_mean) > 1e-6 else 0 print(f" {col:25s} {tr_mean:12.4f} {te_mean:12.4f} {ratio:8.2f}") all_ok = ok1 and ok2 and ok3 and ok4 print(f"\n {'✓ ALL CHECKS PASSED' if all_ok else '✗ LEAKAGE DETECTED'}") print(f"{'='*60}") return all_ok # ============================================================ # MAIN # ============================================================ async def main(): # ========== CONFIG ========== TICKER = "^NSEI" # Nifty 50 TRAIN_DAYS = 120 # 4 months of training articles TEST_DAYS = 14 # 2 weeks of test articles # ============================ print(f"\n{'#'*60}") print(f" LightGBM NEWS IMPACT MODEL") print(f" Ticker: {TICKER}") print(f" Train: {TRAIN_DAYS} days | Test: {TEST_DAYS} days") print(f"{'#'*60}") # Build dataset with strict temporal split pipeline = DataPipeline( TICKER, train_days=TRAIN_DAYS, test_days=TEST_DAYS ) ( X_train, y_dir, y_hh, y_ret, X_test, test_df, price_df ) = await pipeline.build_dataset() # Audit for leakage BEFORE training # We need train_df for audit, reconstruct from pipeline now = datetime.now() cutoff = (now - timedelta(days=TEST_DAYS)).date() # Create a minimal train_df for audit train_df_audit = pd.DataFrame({ 'date': [cutoff - timedelta(days=1)] * len(X_train), 'link': [f'train_{i}' for i in range(len(X_train))] }) test_df_audit = test_df[['date', 'link']].copy() # Run leakage audit audit_leakage(X_train, X_test, train_df_audit, test_df_audit) # Train and evaluate model = StockNewsModel() results = model.train_and_evaluate( X_train, y_dir, y_hh, y_ret, X_test, test_df ) # Display predictions display_predictions(results, TICKER, top_n=25) # Save model model.save(f'{TICKER.replace("^","")}_model.pkl') # Save predictions CSV out_file = ( f'{TICKER.replace("^","")}_predictions_' f'{datetime.now().strftime("%Y%m%d")}.csv' ) save_cols = [ c for c in [ 'title','link','timestamp','date', 'pred_dir_label','pred_dir_prob', 'pred_hh','pred_hh_prob', 'pred_ret_pct','impact', 'sent_combined','n_bull','n_bear', ] if c in results.columns ] results[save_cols].to_csv(out_file, index=False) print(f"\nSaved → {out_file}") # Print final summary print(f"\n{'#'*60}") print(f" FINAL SUMMARY") print(f"{'#'*60}") print(f" Train samples: {len(X_train)}") print(f" Test samples: {len(X_test)}") print(f" Features: {len(model.feature_names)}") print(f"\n CV Metrics:") for name, m in model.metrics.items(): print(f" {name}:") for k, v in m.items(): print(f" {k}: {v:.4f}") print(f"{'#'*60}") return results, model # ============================================================ # RUN # ============================================================ results, model = asyncio.run(main())