"""Real-Time Data Streaming - WebSocket feeds and live processing""" import numpy as np import pandas as pd import threading import time import json from typing import Dict, List, Optional, Callable from collections import deque from datetime import datetime, timedelta import warnings warnings.filterwarnings('ignore') class LiveDataBuffer: """Thread-safe circular buffer for real-time market data""" def __init__(self, max_len=10000): self.max_len = max_len self._data = {} self._lock = threading.Lock() def append(self, ticker, timestamp, open_p, high, low, close, volume, **kwargs): with self._lock: if ticker not in self._data: self._data[ticker] = deque(maxlen=self.max_len) self._data[ticker].append({ 'timestamp': timestamp, 'Open': open_p, 'High': high, 'Low': low, 'Close': close, 'Volume': volume, **kwargs }) def get_dataframe(self, ticker, last_n=None): with self._lock: if ticker not in self._data: return pd.DataFrame() data = list(self._data[ticker]) if last_n: data = data[-last_n:] return pd.DataFrame(data).set_index('timestamp') def latest(self, ticker): with self._lock: if ticker not in self._data or len(self._data[ticker]) == 0: return None return self._data[ticker][-1] class AlpacaStreamer: """Alpaca Markets WebSocket - free real-time IEX data""" def __init__(self, api_key, secret_key, buffer=None, paper=True): self.api_key = api_key self.secret_key = secret_key self.paper = paper self.buffer = buffer or LiveDataBuffer() self._running = False self._callbacks = [] def subscribe(self, tickers): self._running = True url = "wss://paper-api.alpaca.markets/stream" if self.paper else "wss://data.alpaca.markets/stream" def on_message(ws, message): data = json.loads(message) if data.get('T') in ('b', 't'): self.buffer.append( ticker=data['S'], timestamp=pd.Timestamp(data['t']), open_p=data.get('o', data.get('p', 0)), high=data.get('h', data.get('p', 0)), low=data.get('l', data.get('p', 0)), close=data.get('c', data.get('p', 0)), volume=data.get('v', data.get('s', 0)), vwap=data.get('vw', data.get('p', 0)) ) for cb in self._callbacks: cb(data) def on_open(ws): ws.send(json.dumps({"action": "auth", "key": self.api_key, "secret": self.secret_key})) time.sleep(1) for t in tickers: ws.send(json.dumps({"action": "subscribe", "bars": [t]})) import websocket self._ws = websocket.WebSocketApp(url, on_open=on_open, on_message=on_message) self._thread = threading.Thread(target=self._ws.run_forever, daemon=True) self._thread.start() def on_data(self, callback): self._callbacks.append(callback) def unsubscribe(self): self._running = False if hasattr(self, '_ws'): self._ws.close() class NewsStreamAggregator: """Aggregate news from Yahoo Finance + FMP + custom RSS""" def __init__(self, tickers, sentiment_model=None): self.tickers = tickers self.sentiment_model = sentiment_model self.news_buffer = deque(maxlen=5000) self._running = False def fetch_yahoo_news(self, ticker): import yfinance as yf try: tk = yf.Ticker(ticker) news = tk.news or [] articles = [] for item in news: articles.append({ 'ticker': ticker, 'timestamp': pd.Timestamp(item.get('publish_time', 0), unit='s') if 'publish_time' in item else pd.Timestamp.now(), 'title': item.get('title', ''), 'source': item.get('publisher', 'yahoo'), 'text': item.get('title', ''), }) return articles except Exception as e: return [] def fetch_fmp_news(self, api_key='demo'): import requests try: url = "https://financialmodelingprep.com/api/v3/stock_news" resp = requests.get(url, params={'tickers': ','.join(self.tickers), 'limit': 100, 'apikey': api_key}, timeout=10) if resp.status_code == 200: articles = [] for item in resp.json(): articles.append({ 'ticker': item.get('symbol', ''), 'timestamp': pd.Timestamp(item.get('publishedDate', '')), 'title': item.get('title', ''), 'source': 'fmp', 'text': item.get('text', item.get('title', '')), }) return articles except: pass return [] def process_sentiment(self, articles): df = pd.DataFrame(articles) if len(df) == 0 or self.sentiment_model is None: if len(df) > 0: df['sentiment_score'] = 0.0 return df texts = (df['title'].fillna('') + ' ' + df.get('text', pd.Series(['']*len(df))).fillna('')).tolist() sentiments = self.sentiment_model.analyze_batch(texts) df['sentiment_score'] = [s['sentiment_score'] for s in sentiments] df['sentiment_label'] = [s['label'] for s in sentiments] return df def start_streaming(self, poll_seconds=300): self._running = True def poll(): while self._running: all_articles = [] for t in self.tickers: all_articles.extend(self.fetch_yahoo_news(t)) all_articles.extend(self.fetch_fmp_news()) if all_articles: df = self.process_sentiment(all_articles) for _, row in df.iterrows(): self.news_buffer.append(row.to_dict()) time.sleep(poll_seconds) self._thread = threading.Thread(target=poll, daemon=True) self._thread.start() def stop(self): self._running = False def get_latest_sentiment(self, ticker, hours=24): if len(self.news_buffer) == 0: return pd.DataFrame() df = pd.DataFrame(list(self.news_buffer)) if 'ticker' not in df.columns: return df cutoff = pd.Timestamp.now() - timedelta(hours=hours) return df[(df['ticker'] == ticker) & (df['timestamp'] > cutoff)] class OrderFlowEstimator: """Estimate order flow imbalance from tick data""" def __init__(self, lookback=1000): self.lookback = lookback self._trades = {} def process_trade(self, ticker, price, volume, timestamp=None, side='auto'): if ticker not in self._trades: self._trades[ticker] = deque(maxlen=self.lookback) if side == 'auto' and len(self._trades[ticker]) > 0: last_price = self._trades[ticker][-1]['price'] side = 'buy' if price > last_price else ('sell' if price < last_price else 'neutral') self._trades[ticker].append({'price': price, 'volume': volume, 'timestamp': timestamp or pd.Timestamp.now(), 'side': side}) def get_imbalance(self, ticker, window=100): if ticker not in self._trades: return {'ofi': 0, 'buy_volume': 0, 'sell_volume': 0} df = pd.DataFrame(list(self._trades[ticker])[-window:]) buy_vol = df[df['side'] == 'buy']['volume'].sum() sell_vol = df[df['side'] == 'sell']['volume'].sum() total = buy_vol + sell_vol ofi = (buy_vol - sell_vol) / total if total > 0 else 0 return {'ofi': ofi, 'buy_volume': int(buy_vol), 'sell_volume': int(sell_vol)} class RealtimeFeatureEngine: """Real-time feature computation pipeline""" def __init__(self, tickers, data_source='yahoo', api_key='', secret_key='', include_sentiment=True, sentiment_model=None): self.tickers = tickers self.buffer = LiveDataBuffer(max_len=50000) self.order_flow = OrderFlowEstimator() self.news = NewsStreamAggregator(tickers, sentiment_model) self._source = data_source self._api_key = api_key self._secret_key = secret_key def start(self, interval='1m', poll_seconds=60): self.news.start_streaming() if self._source == 'alpaca': self.streamer = AlpacaStreamer(self._api_key, self._secret_key, self.buffer) self.streamer.subscribe(self.tickers) else: import yfinance as yf def poll(): while True: for t in self.tickers: try: hist = yf.Ticker(t).history(period='1d', interval=interval) for idx, row in hist.iterrows(): self.buffer.append(t, idx, row['Open'], row['High'], row['Low'], row['Close'], row['Volume']) except: pass time.sleep(poll_seconds) self._thread = threading.Thread(target=poll, daemon=True) self._thread.start() def stop(self): if hasattr(self, 'streamer') and hasattr(self.streamer, 'unsubscribe'): self.streamer.unsubscribe() self.news.stop() def get_latest(self, ticker, lookback=60): return self.buffer.get_dataframe(ticker, last_n=lookback) def get_sentiment(self, ticker): return self.news.get_latest_sentiment(ticker)