|
|
|
|
|
|
|
|
|
|
| import asyncio
|
| import aiohttp
|
| import xml.etree.ElementTree as ET
|
| import ssl
|
| import re
|
| import os
|
| import glob
|
| import pickle
|
| import warnings
|
| import numpy as np
|
| import pandas as pd
|
| import yfinance as yf
|
| import lightgbm as lgb
|
| import nest_asyncio
|
|
|
| from datetime import datetime, timedelta
|
| from email.utils import parsedate_to_datetime
|
| from bs4 import BeautifulSoup
|
| from textblob import TextBlob
|
| from sklearn.metrics import (
|
| accuracy_score, precision_score, recall_score,
|
| f1_score, roc_auc_score, mean_squared_error,
|
| classification_report, confusion_matrix
|
| )
|
|
|
| warnings.filterwarnings("ignore")
|
| nest_asyncio.apply()
|
|
|
|
|
|
|
|
|
|
|
|
|
| class NewsScraper:
|
| def __init__(self, limit=600):
|
| self.limit = limit
|
| self.ssl_context = ssl.create_default_context()
|
| self.ssl_context.check_hostname = False
|
| self.ssl_context.verify_mode = ssl.CERT_NONE
|
|
|
| async def fetch_feed(self, session, url):
|
| try:
|
| async with session.get(
|
| url, timeout=aiohttp.ClientTimeout(total=8)
|
| ) as resp:
|
| if resp.status == 200:
|
| return await resp.text()
|
| except:
|
| pass
|
| return ""
|
|
|
| def parse_feed(self, xml_text, lookback_date):
|
| articles = []
|
| if not xml_text:
|
| return articles
|
| try:
|
| lb = lookback_date.date()
|
| root = ET.fromstring(xml_text)
|
| for item in root.findall('.//item'):
|
| t = item.findtext('title')
|
| l = item.findtext('link')
|
| p = item.findtext('pubDate')
|
| if t and l and p:
|
| try:
|
| dt = parsedate_to_datetime(p)
|
| if dt.date() >= lb:
|
| articles.append({
|
| 'title': t, 'link': l,
|
| 'pub_date': p,
|
| 'timestamp': dt.isoformat()
|
| })
|
| except:
|
| pass
|
| except:
|
| pass
|
| return articles
|
|
|
| def _build_queries(self, ticker):
|
| t = ticker
|
| return [
|
| t, f"{t} stock", f"{t} news", f"{t} market",
|
| f"{t} earnings", f"{t} analyst", f"{t} forecast",
|
| f"{t} price target", f"{t} options", f"{t} technical",
|
| f"{t} dividend", f"{t} industry", f"{t} competitor",
|
| f"{t} share price", f"{t} hedge fund",
|
| f"{t} institutional",
|
| f"{t} buy sell hold", f"{t} upgrade downgrade",
|
| f"{t} outperform underperform",
|
| f"{t} bullish bearish", f"{t} momentum",
|
| f"{t} breakout breakdown", f"{t} rally crash",
|
| f"{t} surge plunge", f"{t} soar tumble",
|
| f"{t} gains losses", f"{t} beat miss expectations",
|
| f"{t} CEO news", f"{t} quarterly results",
|
| f"{t} revenue profit", f"{t} guidance outlook",
|
| f"{t} acquisition merger", f"{t} lawsuit legal SEC",
|
| f"{t} insider trading", f"{t} buyback repurchase",
|
| f"{t} partnership deal", f"{t} product launch",
|
| f"{t} layoffs restructuring", f"{t} expansion growth",
|
| f"{t} wall street", f"{t} analyst rating",
|
| f"{t} price prediction", f"{t} short interest",
|
| f"{t} short squeeze", f"{t} put call ratio",
|
| f"{t} sector outlook", f"{t} industry trend",
|
| f"{t} supply chain", f"{t} regulation policy",
|
| f"{t} inflation impact", f"{t} interest rate",
|
| f"{t} today", f"{t} this week", f"{t} latest",
|
| f"{t} breaking news", f"{t} update",
|
| f"{t} premarket", f"{t} after hours",
|
| ]
|
|
|
| async def scrape(self, ticker, lookback_date, progress_cb=None):
|
| queries = self._build_queries(ticker)
|
| all_articles = []
|
| seen = set()
|
| conn = aiohttp.TCPConnector(limit=50, ssl=self.ssl_context)
|
| async with aiohttp.ClientSession(connector=conn) as session:
|
| urls = []
|
| for q in queries:
|
| enc = q.replace(' ', '+')
|
| urls.append(
|
| f"https://news.google.com/rss/search?q={enc}"
|
| f"&hl=en-US&gl=US&ceid=US:en"
|
| )
|
| for i in range(0, len(urls), 20):
|
| if len(all_articles) >= self.limit:
|
| break
|
| batch = urls[i:i+20]
|
| tasks = [self.fetch_feed(session, u) for u in batch]
|
| results = await asyncio.gather(
|
| *tasks, return_exceptions=True
|
| )
|
| for xml in results:
|
| if isinstance(xml, Exception) or not xml:
|
| continue
|
| for a in self.parse_feed(xml, lookback_date):
|
| if a['link'] not in seen:
|
| seen.add(a['link'])
|
| all_articles.append(a)
|
| if progress_cb:
|
| progress_cb(
|
| min(len(all_articles)/self.limit, 1.0),
|
| len(all_articles)
|
| )
|
| await asyncio.sleep(0.1)
|
| print(f"[Scraper] {len(all_articles)} unique articles")
|
| return all_articles[:self.limit]
|
|
|
|
|
|
|
|
|
|
|
|
|
| class ContentExtractor:
|
| def __init__(self):
|
| self.ssl_ctx = ssl.create_default_context()
|
| self.ssl_ctx.check_hostname = False
|
| self.ssl_ctx.verify_mode = ssl.CERT_NONE
|
|
|
| async def _fetch_one(self, session, url):
|
| try:
|
| async with session.get(
|
| url, timeout=aiohttp.ClientTimeout(total=10),
|
| headers={
|
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'
|
| },
|
| allow_redirects=True
|
| ) as resp:
|
| if resp.status == 200:
|
| html = await resp.text()
|
| return self._parse_html(html)
|
| except:
|
| pass
|
| return ""
|
|
|
| def _parse_html(self, html):
|
| try:
|
| soup = BeautifulSoup(html, 'html.parser')
|
| for tag in soup(
|
| ['script','style','nav','header','footer',
|
| 'aside','iframe','noscript','form']
|
| ):
|
| tag.decompose()
|
| article = soup.find('article')
|
| paras = (article or soup).find_all('p')
|
| parts = [
|
| p.get_text(strip=True) for p in paras
|
| if len(p.get_text(strip=True)) > 30
|
| ]
|
| return ' '.join(parts)[:3000]
|
| except:
|
| return ""
|
|
|
| async def extract_all(self, articles):
|
| conn = aiohttp.TCPConnector(limit=25, ssl=self.ssl_ctx)
|
| async with aiohttp.ClientSession(connector=conn) as session:
|
| for i in range(0, len(articles), 10):
|
| batch = articles[i:i+10]
|
| tasks = [
|
| self._fetch_one(session, a['link'])
|
| for a in batch
|
| ]
|
| results = await asyncio.gather(
|
| *tasks, return_exceptions=True
|
| )
|
| for j, content in enumerate(results):
|
| articles[i+j]['content'] = (
|
| content if isinstance(content, str) else ""
|
| )
|
| await asyncio.sleep(0.15)
|
| return articles
|
|
|
|
|
|
|
|
|
|
|
|
|
| class Features:
|
| BULL = [
|
| 'upgrade','buy','outperform','beat','surge','soar',
|
| 'rally','breakout','strong','growth','profit','record',
|
| 'bullish','positive','raise','upside','optimistic',
|
| 'boom','gain','higher','best','partnership','deal',
|
| 'approval','dividend','buyback','expansion','launch',
|
| 'breakthrough','recovery','momentum','confidence',
|
| ]
|
| BEAR = [
|
| 'downgrade','sell','underperform','miss','plunge',
|
| 'crash','tumble','breakdown','weak','decline','loss',
|
| 'bearish','negative','cut','lower','worst','risk',
|
| 'warning','concern','fear','recession','lawsuit',
|
| 'investigation','fraud','layoff','restructuring',
|
| 'debt','default','bankruptcy','slump','drop','falling',
|
| 'disappointing','headwind','pressure','downturn',
|
| 'uncertainty','volatile','overvalued',
|
| ]
|
| URGENT = [
|
| 'breaking','alert','urgent','exclusive','flash',
|
| 'developing','critical','emergency',
|
| ]
|
| SOURCES = {
|
| 'reuters':3,'bloomberg':3,'wsj':3,
|
| 'wall street journal':3,'financial times':3,
|
| 'cnbc':2,'marketwatch':2,'seeking alpha':2,
|
| 'barrons':2,'yahoo finance':2,
|
| 'benzinga':1,'motley fool':1,'zacks':1,'tipranks':1,
|
| }
|
|
|
| def __init__(self, ticker):
|
| self.ticker = ticker.lower()
|
|
|
| def build(self, df):
|
| """Extract all features. Input must have: title, content, timestamp."""
|
| df = df.copy()
|
| df['content'] = df['content'].fillna('')
|
| df['title'] = df['title'].fillna('')
|
| text = (df['title'] + ' ' + df['content']).str.lower()
|
| title_low = df['title'].str.lower()
|
|
|
|
|
| df['sent_title'] = df['title'].apply(
|
| lambda x: TextBlob(str(x)).sentiment.polarity
|
| )
|
| df['subj_title'] = df['title'].apply(
|
| lambda x: TextBlob(str(x)).sentiment.subjectivity
|
| )
|
| df['sent_content'] = df['content'].apply(
|
| lambda x: TextBlob(str(x)).sentiment.polarity
|
| )
|
| df['subj_content'] = df['content'].apply(
|
| lambda x: TextBlob(str(x)).sentiment.subjectivity
|
| )
|
|
|
| df['sent_combined'] = (
|
| df['sent_title'] * 0.6 + df['sent_content'] * 0.4
|
| )
|
| df['sent_abs'] = df['sent_combined'].abs()
|
|
|
| df['sent_agreement'] = (
|
| df['sent_title'] * df['sent_content']
|
| )
|
|
|
|
|
| df['n_bull'] = text.apply(
|
| lambda x: sum(w in x for w in self.BULL)
|
| )
|
| df['n_bear'] = text.apply(
|
| lambda x: sum(w in x for w in self.BEAR)
|
| )
|
| df['n_urgent'] = text.apply(
|
| lambda x: sum(w in x for w in self.URGENT)
|
| )
|
| df['bull_bear_net'] = df['n_bull'] - df['n_bear']
|
| df['bull_bear_ratio'] = (
|
| df['n_bull'] / df['n_bear'].clip(lower=1)
|
| )
|
|
|
| df['sent_kw_align'] = (
|
| df['sent_combined'] * df['bull_bear_net']
|
| )
|
|
|
|
|
| df['len_title'] = df['title'].str.len()
|
| df['len_content'] = df['content'].str.len()
|
| df['words_title'] = df['title'].str.split().str.len().fillna(0)
|
| df['words_content'] = df['content'].str.split().str.len().fillna(0)
|
| df['n_excl'] = (df['title'] + df['content']).str.count('!')
|
| df['n_quest'] = (df['title'] + df['content']).str.count(r'\?')
|
| df['caps_ratio'] = df['title'].apply(
|
| lambda x: sum(c.isupper() for c in str(x))
|
| / max(len(str(x)), 1)
|
| )
|
| df['n_numbers'] = text.apply(
|
| lambda x: len(re.findall(r'\d+\.?\d*', x))
|
| )
|
| df['n_dollar'] = text.apply(
|
| lambda x: len(re.findall(r'\$[\d,.]+', x))
|
| )
|
| df['n_percent'] = text.apply(
|
| lambda x: len(re.findall(r'[\d.]+\s*%', x))
|
| )
|
|
|
|
|
| df['ticker_in_title'] = title_low.str.contains(
|
| self.ticker, regex=False
|
| ).astype(int)
|
| df['ticker_mentions'] = text.str.count(
|
| re.escape(self.ticker)
|
| )
|
|
|
|
|
| df['source_tier'] = title_low.apply(
|
| lambda x: max(
|
| (t for s, t in self.SOURCES.items() if s in x),
|
| default=0
|
| )
|
| )
|
|
|
|
|
| ts = pd.to_datetime(df['timestamp'], errors='coerce', utc=True)
|
| df['hour'] = ts.dt.hour.fillna(12).astype(int)
|
| df['dow'] = ts.dt.dayofweek.fillna(0).astype(int)
|
| df['is_weekend'] = (df['dow'] >= 5).astype(int)
|
| df['is_market_hrs'] = (
|
| (df['hour'] >= 9) & (df['hour'] <= 16)
|
| ).astype(int)
|
| df['is_premarket'] = (
|
| (df['hour'] >= 4) & (df['hour'] < 9)
|
| ).astype(int)
|
|
|
|
|
| earn_words = [
|
| 'earnings','eps','revenue','quarterly','q1','q2',
|
| 'q3','q4','fiscal','results','guidance','outlook',
|
| ]
|
| analyst_words = [
|
| 'analyst','rating','price target','upgrade',
|
| 'downgrade','initiate','coverage','overweight',
|
| 'underweight',
|
| ]
|
| df['is_earnings'] = text.apply(
|
| lambda x: sum(w in x for w in earn_words)
|
| )
|
| df['is_analyst'] = text.apply(
|
| lambda x: sum(w in x for w in analyst_words)
|
| )
|
|
|
|
|
| df['sent_x_urgent'] = df['sent_combined'] * df['n_urgent']
|
| df['sent_x_source'] = df['sent_combined'] * df['source_tier']
|
| df['sent_x_ticker'] = (
|
| df['sent_combined'] * df['ticker_mentions']
|
| )
|
|
|
| return df
|
|
|
|
|
|
|
|
|
|
|
|
|
| class DataPipeline:
|
| """
|
| Handles the entire data flow with STRICT temporal ordering:
|
| - Train: articles from day 1 to cutoff
|
| - Test: articles from cutoff to present
|
| - NO future data leaks into training
|
| """
|
|
|
|
|
| FEATURE_COLS = [
|
| 'sent_title','subj_title','sent_content','subj_content',
|
| 'sent_combined','sent_abs','sent_agreement',
|
| 'n_bull','n_bear','n_urgent','bull_bear_net',
|
| 'bull_bear_ratio','sent_kw_align',
|
| 'len_title','len_content','words_title','words_content',
|
| 'n_excl','n_quest','caps_ratio','n_numbers',
|
| 'n_dollar','n_percent',
|
| 'ticker_in_title','ticker_mentions','source_tier',
|
| 'hour','dow','is_weekend','is_market_hrs','is_premarket',
|
| 'is_earnings','is_analyst',
|
| 'sent_x_urgent','sent_x_source','sent_x_ticker',
|
|
|
| 'vol_5d','vol_change','price_sma5','price_sma20',
|
| 'ret_1d','ret_5d',
|
| ]
|
|
|
| def __init__(self, ticker, train_days=120, test_days=14):
|
| """
|
| ticker: stock symbol (e.g. "^NSEI" for Nifty 50)
|
| train_days: how many days back for training articles
|
| test_days: how many recent days for test articles
|
| """
|
| self.ticker = ticker
|
| self.train_days = train_days
|
| self.test_days = test_days
|
| self.scraper = NewsScraper(limit=600)
|
| self.extractor = ContentExtractor()
|
| self.features = Features(ticker)
|
|
|
| async def build_dataset(self):
|
| """
|
| Returns (X_train, y_train_dir, y_train_hh, y_train_ret,
|
| X_test, test_df, price_df)
|
|
|
| TEMPORAL SPLIT:
|
| βββ train_start ββββββββ cutoff_date ββββ today βββ€
|
| β TRAIN articles β TEST articles β
|
| β labels = next day ret β labels = predict β
|
| """
|
| now = datetime.now()
|
| train_start = now - timedelta(days=self.train_days + self.test_days)
|
| cutoff_date = now - timedelta(days=self.test_days)
|
| test_start = cutoff_date
|
|
|
| print(f"\n{'='*60}")
|
| print(f" DATASET CONSTRUCTION")
|
| print(f" Ticker: {self.ticker}")
|
| print(f" Train range: {train_start.date()} β {cutoff_date.date()}")
|
| print(f" Test range: {cutoff_date.date()} β {now.date()}")
|
| print(f"{'='*60}")
|
|
|
|
|
| print("\n[1/6] Downloading price data...")
|
| price_df = self._get_prices(train_start)
|
| if price_df.empty:
|
| raise ValueError(f"No price data for {self.ticker}")
|
| print(f" β {len(price_df)} trading days loaded")
|
| print(f" β Price range: {price_df['Close'].min():.2f} - {price_df['Close'].max():.2f}")
|
|
|
|
|
| print("\n[2/6] Scraping TRAINING articles...")
|
| train_articles = await self.scraper.scrape(
|
| self.ticker, train_start,
|
| progress_cb=lambda p, n: print(
|
| f"\r Scraping: {n} articles", end='', flush=True
|
| )
|
| )
|
| print(f"\n β {len(train_articles)} raw training articles")
|
|
|
|
|
| print("\n[3/6] Scraping TEST articles...")
|
|
|
| test_scraper = NewsScraper(limit=600)
|
| test_articles = await test_scraper.scrape(
|
| self.ticker, test_start,
|
| progress_cb=lambda p, n: print(
|
| f"\r Scraping: {n} articles", end='', flush=True
|
| )
|
| )
|
| print(f"\n β {len(test_articles)} raw test articles")
|
|
|
|
|
| print("\n[4/6] Extracting article content...")
|
| all_articles = train_articles + test_articles
|
| all_articles = await self.extractor.extract_all(all_articles)
|
|
|
| n_with_content = sum(
|
| 1 for a in all_articles if len(a.get('content','')) > 50
|
| )
|
| print(f" β {n_with_content}/{len(all_articles)} have body text")
|
|
|
|
|
| print("\n[5/6] Applying strict temporal split...")
|
|
|
| df = pd.DataFrame(all_articles)
|
| df['ts'] = pd.to_datetime(
|
| df['timestamp'], errors='coerce', utc=True
|
| )
|
| df = df.dropna(subset=['ts'])
|
| df['date'] = df['ts'].dt.date
|
|
|
|
|
| cutoff = cutoff_date.date()
|
| train_mask = df['date'] < cutoff
|
| test_mask = df['date'] >= cutoff
|
|
|
| train_df = df[train_mask].copy().reset_index(drop=True)
|
| test_df = df[test_mask].copy().reset_index(drop=True)
|
|
|
|
|
| train_links = set(train_df['link'])
|
| test_df = test_df[
|
| ~test_df['link'].isin(train_links)
|
| ].reset_index(drop=True)
|
|
|
| print(f" β Train articles: {len(train_df)}")
|
| print(f" β Test articles: {len(test_df)}")
|
| print(f" β Cutoff date: {cutoff}")
|
| print(f" β Train date range: {train_df['date'].min()} β {train_df['date'].max()}")
|
| print(f" β Test date range: {test_df['date'].min()} β {test_df['date'].max()}")
|
|
|
| if len(train_df) < 20:
|
| raise ValueError(
|
| f"Only {len(train_df)} training articles β need more data. "
|
| f"Try increasing train_days or checking if the ticker is valid."
|
| )
|
|
|
|
|
| print("\n[6/6] Engineering features and labels...")
|
|
|
| train_df = self.features.build(train_df)
|
| test_df = self.features.build(test_df)
|
|
|
|
|
| train_df = self._add_market_context(train_df, price_df)
|
| test_df = self._add_market_context(test_df, price_df)
|
|
|
|
|
| train_df = self._add_labels(train_df, price_df)
|
|
|
|
|
| before = len(train_df)
|
| train_df = train_df.dropna(
|
| subset=['next_ret']
|
| ).reset_index(drop=True)
|
| print(f" β Dropped {before - len(train_df)} unlabeled training rows")
|
| print(f" β Final train size: {len(train_df)}")
|
| print(f" β Final test size: {len(test_df)}")
|
|
|
|
|
| avail_feats = [
|
| c for c in self.FEATURE_COLS if c in train_df.columns
|
| ]
|
| print(f" β Features used: {len(avail_feats)}")
|
|
|
| X_train = train_df[avail_feats].fillna(0).replace(
|
| [np.inf, -np.inf], 0
|
| )
|
| X_test = test_df[avail_feats].fillna(0).replace(
|
| [np.inf, -np.inf], 0
|
| )
|
|
|
|
|
| y_dir = (train_df['next_ret'] > 0).astype(int)
|
| y_hh = (train_df['next_ret'].abs() > 0.015).astype(int)
|
| y_ret = train_df['next_ret'].astype(float)
|
|
|
|
|
| print(f"\n LABEL DISTRIBUTIONS (Training):")
|
| print(f" Direction UP: {y_dir.mean()*100:.1f}%")
|
| print(f" Heavy hitters: {y_hh.mean()*100:.1f}%")
|
| print(f" Mean return: {y_ret.mean()*100:.4f}%")
|
| print(f" Std return: {y_ret.std()*100:.4f}%")
|
| print(f" Min return: {y_ret.min()*100:.4f}%")
|
| print(f" Max return: {y_ret.max()*100:.4f}%")
|
|
|
| return X_train, y_dir, y_hh, y_ret, X_test, test_df, price_df
|
|
|
| def _get_prices(self, start_date):
|
| """Download OHLCV from yfinance."""
|
| stock = yf.Ticker(self.ticker)
|
| df = stock.history(
|
| start=start_date - timedelta(days=30),
|
| end=datetime.now() + timedelta(days=2),
|
| auto_adjust=True
|
| )
|
| if df.empty:
|
| return df
|
| df = df.reset_index()
|
|
|
| if hasattr(df['Date'].dtype, 'tz') and df['Date'].dtype.tz is not None:
|
| df['Date'] = df['Date'].dt.tz_localize(None)
|
| df['Date'] = pd.to_datetime(df['Date']).dt.date
|
| df['ret'] = df['Close'].pct_change()
|
| df['ret_5d'] = df['Close'].pct_change(5)
|
| df['vol_5d'] = df['ret'].rolling(5).std()
|
| df['vol_20d'] = df['ret'].rolling(20).std()
|
| df['vol_change'] = df['Volume'].pct_change()
|
| df['sma5'] = df['Close'].rolling(5).mean()
|
| df['sma20'] = df['Close'].rolling(20).mean()
|
| df['price_sma5'] = df['Close'] / df['sma5'] - 1
|
| df['price_sma20'] = df['Close'] / df['sma20'] - 1
|
| return df
|
|
|
| def _add_market_context(self, articles_df, price_df):
|
| """
|
| Add market features using ONLY data BEFORE the article date.
|
| This prevents look-ahead bias.
|
| """
|
| trading_dates = sorted(price_df['Date'].unique())
|
|
|
| def get_prior_trading_date(pub_date):
|
| """Find the most recent trading day ON OR BEFORE pub_date."""
|
| for td in reversed(trading_dates):
|
| if td <= pub_date:
|
| return td
|
| return trading_dates[0] if trading_dates else None
|
|
|
| articles_df['prior_td'] = articles_df['date'].apply(
|
| get_prior_trading_date
|
| )
|
|
|
|
|
| price_map = price_df.set_index('Date')
|
| for col, src in [
|
| ('vol_5d', 'vol_5d'),
|
| ('vol_change', 'vol_change'),
|
| ('price_sma5', 'price_sma5'),
|
| ('price_sma20', 'price_sma20'),
|
| ('ret_1d', 'ret'),
|
| ('ret_5d', 'ret_5d'),
|
| ]:
|
| mapping = price_map[src].to_dict() if src in price_map.columns else {}
|
| articles_df[col] = articles_df['prior_td'].map(mapping)
|
|
|
| return articles_df
|
|
|
| def _add_labels(self, articles_df, price_df):
|
| """
|
| Label = return from article's NEXT trading day.
|
| Article published on day T β label = close(T+1)/close(T) - 1
|
| """
|
| trading_dates = sorted(price_df['Date'].unique())
|
| td_set = set(trading_dates)
|
|
|
| def get_next_trading_date(pub_date):
|
| """Find the next trading day STRICTLY AFTER pub_date."""
|
| for td in trading_dates:
|
| if td > pub_date:
|
| return td
|
| return None
|
|
|
| def get_current_or_next_td(pub_date):
|
| """Find the current trading day (for close price)."""
|
| if pub_date in td_set:
|
| return pub_date
|
| for td in trading_dates:
|
| if td > pub_date:
|
| return td
|
| return None
|
|
|
| price_close = price_df.set_index('Date')['Close'].to_dict()
|
|
|
| next_rets = []
|
| for _, row in articles_df.iterrows():
|
| pub = row['date']
|
| td_current = get_current_or_next_td(pub)
|
| td_next = get_next_trading_date(pub) if td_current == pub else None
|
|
|
| if td_current and td_current != pub:
|
|
|
|
|
| idx = trading_dates.index(td_current) if td_current in trading_dates else -1
|
| if idx >= 0 and idx + 1 < len(trading_dates):
|
| td_next = trading_dates[idx + 1]
|
| c0 = price_close.get(td_current)
|
| c1 = price_close.get(td_next)
|
| if c0 and c1 and c0 > 0:
|
| next_rets.append(c1 / c0 - 1)
|
| else:
|
| next_rets.append(np.nan)
|
| else:
|
| next_rets.append(np.nan)
|
| elif td_current == pub:
|
|
|
| idx = trading_dates.index(pub)
|
| if idx + 1 < len(trading_dates):
|
| td_next = trading_dates[idx + 1]
|
| c0 = price_close.get(pub)
|
| c1 = price_close.get(td_next)
|
| if c0 and c1 and c0 > 0:
|
| next_rets.append(c1 / c0 - 1)
|
| else:
|
| next_rets.append(np.nan)
|
| else:
|
| next_rets.append(np.nan)
|
| else:
|
| next_rets.append(np.nan)
|
|
|
| articles_df['next_ret'] = next_rets
|
| return articles_df
|
|
|
|
|
|
|
|
|
|
|
|
|
| class StockNewsModel:
|
| """
|
| Three LightGBM models:
|
| 1. Direction: will next day be UP? (binary)
|
| 2. Heavy hitter: will move be >1.5%? (binary)
|
| 3. Return: predicted return magnitude (regression)
|
|
|
| Training uses walk-forward validation on the training set.
|
| Final evaluation on held-out temporal test set.
|
| """
|
|
|
| def __init__(self):
|
| self.m_dir = None
|
| self.m_hh = None
|
| self.m_ret = None
|
| self.feature_names = None
|
| self.metrics = {}
|
|
|
| def train_and_evaluate(
|
| self, X_train, y_dir, y_hh, y_ret, X_test, test_df
|
| ):
|
| """
|
| 1. Walk-forward CV on training data (internal validation)
|
| 2. Train final models on ALL training data
|
| 3. Predict on test set (never seen during training)
|
| """
|
| self.feature_names = list(X_train.columns)
|
| n_train = len(X_train)
|
| n_test = len(X_test)
|
|
|
| print(f"\n{'='*60}")
|
| print(f" MODEL TRAINING")
|
| print(f" Training samples: {n_train}")
|
| print(f" Test samples: {n_test}")
|
| print(f" Features: {len(self.feature_names)}")
|
| print(f"{'='*60}")
|
|
|
|
|
|
|
|
|
| print(f"\n{'β'*60}")
|
| print(f" Model 1: DIRECTION (Up/Down Tomorrow)")
|
| print(f"{'β'*60}")
|
|
|
| self.m_dir = self._train_classifier(
|
| X_train, y_dir, "direction"
|
| )
|
|
|
|
|
|
|
|
|
| print(f"\n{'β'*60}")
|
| print(f" Model 2: HEAVY HITTER (>1.5% Move)")
|
| print(f"{'β'*60}")
|
|
|
|
|
| n_pos = y_hh.sum()
|
| n_neg = len(y_hh) - n_pos
|
| scale = n_neg / max(n_pos, 1)
|
| print(f" Class balance: {n_pos} positive / {n_neg} negative")
|
| print(f" scale_pos_weight: {scale:.2f}")
|
|
|
| self.m_hh = self._train_classifier(
|
| X_train, y_hh, "heavy_hitter",
|
| extra_params={'scale_pos_weight': scale}
|
| )
|
|
|
|
|
|
|
|
|
| print(f"\n{'β'*60}")
|
| print(f" Model 3: RETURN REGRESSOR")
|
| print(f"{'β'*60}")
|
|
|
| self.m_ret = self._train_regressor(X_train, y_ret)
|
|
|
|
|
|
|
|
|
| print(f"\n{'='*60}")
|
| print(f" TEST SET PREDICTIONS ({n_test} articles)")
|
| print(f"{'='*60}")
|
|
|
| test_results = self._predict(X_test, test_df)
|
| return test_results
|
|
|
| def _train_classifier(self, X, y, name, extra_params=None):
|
| """Walk-forward CV + final model."""
|
| params = {
|
| 'objective': 'binary',
|
| 'metric': 'binary_logloss',
|
| 'boosting_type': 'gbdt',
|
| 'num_leaves': 24,
|
| 'learning_rate': 0.03,
|
| 'feature_fraction': 0.7,
|
| 'bagging_fraction': 0.7,
|
| 'bagging_freq': 5,
|
| 'min_child_samples': 20,
|
| 'reg_alpha': 0.3,
|
| 'reg_lambda': 0.3,
|
| 'verbose': -1,
|
| 'n_estimators': 500,
|
| 'random_state': 42,
|
| }
|
| if extra_params:
|
| params.update(extra_params)
|
|
|
| n = len(X)
|
|
|
|
|
| fold_results = []
|
| min_train_pct = 0.6
|
| val_size_pct = 0.1
|
|
|
| n_folds = int((1.0 - min_train_pct) / val_size_pct)
|
| print(f"\n Walk-forward validation ({n_folds} folds):")
|
|
|
| for fold in range(n_folds):
|
| train_end = int(n * (min_train_pct + fold * val_size_pct))
|
| val_end = int(n * (min_train_pct + (fold+1) * val_size_pct))
|
| val_end = min(val_end, n)
|
|
|
| if train_end >= val_end:
|
| continue
|
|
|
| X_tr = X.iloc[:train_end]
|
| y_tr = y.iloc[:train_end]
|
| X_vl = X.iloc[train_end:val_end]
|
| y_vl = y.iloc[train_end:val_end]
|
|
|
| if len(y_vl) < 5 or y_vl.nunique() < 2:
|
| continue
|
|
|
| mdl = lgb.LGBMClassifier(**params)
|
| mdl.fit(
|
| X_tr, y_tr,
|
| eval_set=[(X_vl, y_vl)],
|
| callbacks=[
|
| lgb.early_stopping(30, verbose=False),
|
| lgb.log_evaluation(0),
|
| ],
|
| )
|
|
|
| pred = mdl.predict(X_vl)
|
| prob = mdl.predict_proba(X_vl)[:, 1]
|
| acc = accuracy_score(y_vl, pred)
|
|
|
| try:
|
| auc = roc_auc_score(y_vl, prob)
|
| except:
|
| auc = 0.5
|
|
|
| f1 = f1_score(y_vl, pred, zero_division=0)
|
| fold_results.append({
|
| 'acc': acc, 'auc': auc, 'f1': f1,
|
| 'train_n': len(X_tr), 'val_n': len(X_vl)
|
| })
|
| print(
|
| f" Fold {fold+1}: Acc={acc:.4f} AUC={auc:.4f} "
|
| f"F1={f1:.4f} (train={len(X_tr)}, val={len(X_vl)})"
|
| )
|
|
|
| if fold_results:
|
| avg_acc = np.mean([f['acc'] for f in fold_results])
|
| avg_auc = np.mean([f['auc'] for f in fold_results])
|
| avg_f1 = np.mean([f['f1'] for f in fold_results])
|
| print(f"\n CV MEAN: Acc={avg_acc:.4f} AUC={avg_auc:.4f} F1={avg_f1:.4f}")
|
| self.metrics[name] = {
|
| 'cv_acc': avg_acc, 'cv_auc': avg_auc, 'cv_f1': avg_f1
|
| }
|
|
|
|
|
| print(f"\n Training final model on all {n} samples...")
|
| final = lgb.LGBMClassifier(**params)
|
| final.fit(X, y)
|
|
|
|
|
| imp = pd.DataFrame({
|
| 'feature': X.columns,
|
| 'importance': final.feature_importances_
|
| }).sort_values('importance', ascending=False)
|
| print(f"\n Top 10 features ({name}):")
|
| for _, row in imp.head(10).iterrows():
|
| bar = 'β' * int(row['importance'] / max(imp['importance'].max(), 1) * 20)
|
| print(f" {row['feature']:25s} {row['importance']:6.0f} {bar}")
|
|
|
| return final
|
|
|
| def _train_regressor(self, X, y):
|
| """Walk-forward CV for regression."""
|
| params = {
|
| 'objective': 'regression',
|
| 'metric': 'rmse',
|
| 'boosting_type': 'gbdt',
|
| 'num_leaves': 24,
|
| 'learning_rate': 0.03,
|
| 'feature_fraction': 0.7,
|
| 'bagging_fraction': 0.7,
|
| 'bagging_freq': 5,
|
| 'min_child_samples': 20,
|
| 'reg_alpha': 0.3,
|
| 'reg_lambda': 0.3,
|
| 'verbose': -1,
|
| 'n_estimators': 500,
|
| 'random_state': 42,
|
| }
|
|
|
| n = len(X)
|
| min_train_pct = 0.6
|
| val_size_pct = 0.1
|
| n_folds = int((1.0 - min_train_pct) / val_size_pct)
|
|
|
| print(f"\n Walk-forward validation ({n_folds} folds):")
|
|
|
| fold_results = []
|
| for fold in range(n_folds):
|
| train_end = int(n * (min_train_pct + fold * val_size_pct))
|
| val_end = int(n * (min_train_pct + (fold+1) * val_size_pct))
|
| val_end = min(val_end, n)
|
|
|
| if train_end >= val_end:
|
| continue
|
|
|
| X_tr = X.iloc[:train_end]
|
| y_tr = y.iloc[:train_end]
|
| X_vl = X.iloc[train_end:val_end]
|
| y_vl = y.iloc[train_end:val_end]
|
|
|
| if len(y_vl) < 5:
|
| continue
|
|
|
| mdl = lgb.LGBMRegressor(**params)
|
| mdl.fit(
|
| X_tr, y_tr,
|
| eval_set=[(X_vl, y_vl)],
|
| callbacks=[
|
| lgb.early_stopping(30, verbose=False),
|
| lgb.log_evaluation(0),
|
| ],
|
| )
|
|
|
| pred = mdl.predict(X_vl)
|
| rmse = np.sqrt(mean_squared_error(y_vl, pred))
|
|
|
|
|
| dir_acc = np.mean(np.sign(pred) == np.sign(y_vl))
|
|
|
| corr = np.corrcoef(y_vl, pred)[0, 1] if len(y_vl) > 2 else 0
|
|
|
| fold_results.append({
|
| 'rmse': rmse, 'dir_acc': dir_acc, 'corr': corr
|
| })
|
| print(
|
| f" Fold {fold+1}: RMSE={rmse:.6f} "
|
| f"DirAcc={dir_acc:.4f} Corr={corr:.4f}"
|
| )
|
|
|
| if fold_results:
|
| avg_rmse = np.mean([f['rmse'] for f in fold_results])
|
| avg_dir = np.mean([f['dir_acc'] for f in fold_results])
|
| avg_corr = np.mean([f['corr'] for f in fold_results])
|
| print(f"\n CV MEAN: RMSE={avg_rmse:.6f} DirAcc={avg_dir:.4f} Corr={avg_corr:.4f}")
|
| self.metrics['return'] = {
|
| 'cv_rmse': avg_rmse, 'cv_dir_acc': avg_dir, 'cv_corr': avg_corr
|
| }
|
|
|
| print(f"\n Training final model on all {n} samples...")
|
| final = lgb.LGBMRegressor(**params)
|
| final.fit(X, y)
|
|
|
| imp = pd.DataFrame({
|
| 'feature': X.columns,
|
| 'importance': final.feature_importances_
|
| }).sort_values('importance', ascending=False)
|
| print(f"\n Top 10 features (return):")
|
| for _, row in imp.head(10).iterrows():
|
| bar = 'β' * int(row['importance'] / max(imp['importance'].max(), 1) * 20)
|
| print(f" {row['feature']:25s} {row['importance']:6.0f} {bar}")
|
|
|
| return final
|
|
|
| def _predict(self, X_test, test_df):
|
| """Run all 3 models on test data."""
|
| res = test_df.copy()
|
|
|
|
|
| res['pred_dir'] = self.m_dir.predict(X_test)
|
| res['pred_dir_prob'] = self.m_dir.predict_proba(X_test)[:, 1]
|
| res['pred_dir_label'] = res['pred_dir'].map(
|
| {1: 'UP', 0: 'DOWN'}
|
| )
|
|
|
|
|
| res['pred_hh'] = self.m_hh.predict(X_test)
|
| res['pred_hh_prob'] = self.m_hh.predict_proba(X_test)[:, 1]
|
|
|
|
|
| res['pred_ret'] = self.m_ret.predict(X_test)
|
| res['pred_ret_pct'] = res['pred_ret'] * 100
|
|
|
|
|
| res['impact'] = (
|
| res['pred_hh_prob'] * 0.4
|
| + res['pred_ret'].abs() * 20 * 0.3
|
| + (res['pred_dir_prob'] - 0.5).abs() * 2 * 0.3
|
| )
|
|
|
| res = res.sort_values('impact', ascending=False)
|
| return res
|
|
|
| def predict_new(self, X):
|
| """Predict on brand new data."""
|
| res = pd.DataFrame(index=X.index)
|
| res['pred_dir'] = self.m_dir.predict(X)
|
| res['pred_dir_prob'] = self.m_dir.predict_proba(X)[:, 1]
|
| res['pred_hh'] = self.m_hh.predict(X)
|
| res['pred_hh_prob'] = self.m_hh.predict_proba(X)[:, 1]
|
| res['pred_ret'] = self.m_ret.predict(X)
|
| res['impact'] = (
|
| res['pred_hh_prob'] * 0.4
|
| + res['pred_ret'].abs() * 20 * 0.3
|
| + (res['pred_dir_prob'] - 0.5).abs() * 2 * 0.3
|
| )
|
| return res
|
|
|
| def save(self, path):
|
| with open(path, 'wb') as f:
|
| pickle.dump({
|
| 'm_dir': self.m_dir,
|
| 'm_hh': self.m_hh,
|
| 'm_ret': self.m_ret,
|
| 'features': self.feature_names,
|
| 'metrics': self.metrics,
|
| }, f)
|
| print(f"Saved β {path}")
|
|
|
|
|
|
|
|
|
|
|
|
|
| def display_predictions(results, ticker, top_n=25):
|
| """Show ranked predictions with heavy hitters highlighted."""
|
|
|
| total = len(results)
|
| if total == 0:
|
| print("No predictions.")
|
| return
|
|
|
| n_hh = (results['pred_hh'] == 1).sum()
|
| n_up = (results['pred_dir'] == 1).sum()
|
|
|
| print(f"\n{'='*80}")
|
| print(f" PREDICTIONS: {ticker}")
|
| print(f" {datetime.now().strftime('%Y-%m-%d %H:%M')}")
|
| print(f"{'='*80}")
|
| print(f" Articles analyzed: {total}")
|
| print(f" Heavy hitters: {n_hh} ({n_hh/total*100:.1f}%)")
|
| print(f" Bullish: {n_up} ({n_up/total*100:.1f}%)")
|
| print(f" Bearish: {total-n_up} ({(total-n_up)/total*100:.1f}%)")
|
|
|
| if 'pred_ret' in results.columns:
|
| print(f" Mean pred return: {results['pred_ret'].mean()*100:+.4f}%")
|
|
|
|
|
| print(f"\n{'β'*80}")
|
| print(f" TOP {min(top_n, total)} ARTICLES BY PREDICTED IMPACT")
|
| print(f"{'β'*80}")
|
|
|
| for i, (_, r) in enumerate(results.head(top_n).iterrows()):
|
| title = str(r.get('title', ''))[:75]
|
| d = r.get('pred_dir_label', '?')
|
| hh = r.get('pred_hh_prob', 0)
|
| ret = r.get('pred_ret_pct', 0)
|
| imp = r.get('impact', 0)
|
| sent = r.get('sent_combined', 0)
|
|
|
| d_icon = "π’" if d == "UP" else "π΄"
|
| h_icon = "π₯" if hh > 0.5 else " "
|
|
|
| print(f"\n {i+1:2d}. {h_icon}{d_icon} {title}")
|
| print(
|
| f" Impact={imp:.3f} HH={hh:.3f} "
|
| f"Ret={ret:+.3f}% Sent={sent:+.3f}"
|
| )
|
|
|
|
|
| print(f"\n{'='*80}")
|
| if results['impact'].sum() > 0:
|
| w_dir = (
|
| (results['pred_dir_prob'] * results['impact']).sum()
|
| / results['impact'].sum()
|
| )
|
| w_ret = (
|
| (results['pred_ret'] * results['impact']).sum()
|
| / results['impact'].sum()
|
| ) * 100
|
| else:
|
| w_dir = results['pred_dir_prob'].mean()
|
| w_ret = results['pred_ret'].mean() * 100
|
|
|
| sig = "BULLISH" if w_dir > 0.55 else "BEARISH" if w_dir < 0.45 else "NEUTRAL"
|
| icon = "π’" if sig == "BULLISH" else "π΄" if sig == "BEARISH" else "βͺ"
|
|
|
| print(f" {icon} SIGNAL: {sig} (prob={w_dir:.3f})")
|
| print(f" π WEIGHTED RETURN: {w_ret:+.4f}%")
|
| print(f"{'='*80}")
|
|
|
|
|
|
|
|
|
|
|
|
|
| def audit_leakage(X_train, X_test, train_df, test_df):
|
| """Verify there is NO data leakage between train and test."""
|
| print(f"\n{'='*60}")
|
| print(f" DATA LEAKAGE AUDIT")
|
| print(f"{'='*60}")
|
|
|
|
|
| train_max = train_df['date'].max()
|
| test_min = test_df['date'].min()
|
| ok1 = train_max < test_min
|
| print(f" [{'β' if ok1 else 'β'}] Train max date ({train_max}) < Test min date ({test_min})")
|
|
|
|
|
| train_links = set(train_df['link'])
|
| test_links = set(test_df['link'])
|
| overlap = train_links & test_links
|
| ok2 = len(overlap) == 0
|
| print(f" [{'β' if ok2 else 'β'}] No shared articles (overlap={len(overlap)})")
|
|
|
|
|
| future_cols = [
|
| c for c in X_train.columns
|
| if 'next' in c or 'future' in c or 'target' in c or 'label' in c
|
| ]
|
| ok3 = len(future_cols) == 0
|
| print(f" [{'β' if ok3 else 'β'}] No future-looking feature names (found: {future_cols})")
|
|
|
|
|
| ok4 = X_train.shape[1] == X_test.shape[1]
|
| print(f" [{'β' if ok4 else 'β'}] Same feature count: train={X_train.shape[1]} test={X_test.shape[1]}")
|
|
|
|
|
| print(f"\n Feature statistics comparison:")
|
| print(f" {'Feature':25s} {'Train Mean':>12s} {'Test Mean':>12s} {'Ratio':>8s}")
|
| print(f" {'β'*57}")
|
| for col in X_train.columns[:10]:
|
| tr_mean = X_train[col].mean()
|
| te_mean = X_test[col].mean()
|
| ratio = te_mean / tr_mean if abs(tr_mean) > 1e-6 else 0
|
| print(f" {col:25s} {tr_mean:12.4f} {te_mean:12.4f} {ratio:8.2f}")
|
|
|
| all_ok = ok1 and ok2 and ok3 and ok4
|
| print(f"\n {'β ALL CHECKS PASSED' if all_ok else 'β LEAKAGE DETECTED'}")
|
| print(f"{'='*60}")
|
| return all_ok
|
|
|
|
|
|
|
|
|
|
|
|
|
| async def main():
|
|
|
| TICKER = "^NSEI"
|
| TRAIN_DAYS = 120
|
| TEST_DAYS = 14
|
|
|
|
|
| print(f"\n{'#'*60}")
|
| print(f" LightGBM NEWS IMPACT MODEL")
|
| print(f" Ticker: {TICKER}")
|
| print(f" Train: {TRAIN_DAYS} days | Test: {TEST_DAYS} days")
|
| print(f"{'#'*60}")
|
|
|
|
|
| pipeline = DataPipeline(
|
| TICKER, train_days=TRAIN_DAYS, test_days=TEST_DAYS
|
| )
|
| (
|
| X_train, y_dir, y_hh, y_ret,
|
| X_test, test_df, price_df
|
| ) = await pipeline.build_dataset()
|
|
|
|
|
|
|
| now = datetime.now()
|
| cutoff = (now - timedelta(days=TEST_DAYS)).date()
|
|
|
|
|
| train_df_audit = pd.DataFrame({
|
| 'date': [cutoff - timedelta(days=1)] * len(X_train),
|
| 'link': [f'train_{i}' for i in range(len(X_train))]
|
| })
|
| test_df_audit = test_df[['date', 'link']].copy()
|
|
|
|
|
| audit_leakage(X_train, X_test, train_df_audit, test_df_audit)
|
|
|
|
|
| model = StockNewsModel()
|
| results = model.train_and_evaluate(
|
| X_train, y_dir, y_hh, y_ret, X_test, test_df
|
| )
|
|
|
|
|
| display_predictions(results, TICKER, top_n=25)
|
|
|
|
|
| model.save(f'{TICKER.replace("^","")}_model.pkl')
|
|
|
|
|
| out_file = (
|
| f'{TICKER.replace("^","")}_predictions_'
|
| f'{datetime.now().strftime("%Y%m%d")}.csv'
|
| )
|
| save_cols = [
|
| c for c in [
|
| 'title','link','timestamp','date',
|
| 'pred_dir_label','pred_dir_prob',
|
| 'pred_hh','pred_hh_prob',
|
| 'pred_ret_pct','impact',
|
| 'sent_combined','n_bull','n_bear',
|
| ] if c in results.columns
|
| ]
|
| results[save_cols].to_csv(out_file, index=False)
|
| print(f"\nSaved β {out_file}")
|
|
|
|
|
| print(f"\n{'#'*60}")
|
| print(f" FINAL SUMMARY")
|
| print(f"{'#'*60}")
|
| print(f" Train samples: {len(X_train)}")
|
| print(f" Test samples: {len(X_test)}")
|
| print(f" Features: {len(model.feature_names)}")
|
| print(f"\n CV Metrics:")
|
| for name, m in model.metrics.items():
|
| print(f" {name}:")
|
| for k, v in m.items():
|
| print(f" {k}: {v:.4f}")
|
| print(f"{'#'*60}")
|
|
|
| return results, model
|
|
|
|
|
|
|
|
|
|
|
|
|
| results, model = asyncio.run(main()) |