| """Real-Time Data Streaming - WebSocket feeds and live processing""" |
| import numpy as np |
| import pandas as pd |
| import threading |
| import time |
| import json |
| from typing import Dict, List, Optional, Callable |
| from collections import deque |
| from datetime import datetime, timedelta |
| import warnings |
| warnings.filterwarnings('ignore') |
|
|
|
|
| class LiveDataBuffer: |
| """Thread-safe circular buffer for real-time market data""" |
| |
| def __init__(self, max_len=10000): |
| self.max_len = max_len |
| self._data = {} |
| self._lock = threading.Lock() |
| |
| def append(self, ticker, timestamp, open_p, high, low, close, volume, **kwargs): |
| with self._lock: |
| if ticker not in self._data: |
| self._data[ticker] = deque(maxlen=self.max_len) |
| self._data[ticker].append({ |
| 'timestamp': timestamp, 'Open': open_p, 'High': high, |
| 'Low': low, 'Close': close, 'Volume': volume, **kwargs |
| }) |
| |
| def get_dataframe(self, ticker, last_n=None): |
| with self._lock: |
| if ticker not in self._data: |
| return pd.DataFrame() |
| data = list(self._data[ticker]) |
| if last_n: |
| data = data[-last_n:] |
| return pd.DataFrame(data).set_index('timestamp') |
| |
| def latest(self, ticker): |
| with self._lock: |
| if ticker not in self._data or len(self._data[ticker]) == 0: |
| return None |
| return self._data[ticker][-1] |
|
|
|
|
| class AlpacaStreamer: |
| """Alpaca Markets WebSocket - free real-time IEX data""" |
| |
| def __init__(self, api_key, secret_key, buffer=None, paper=True): |
| self.api_key = api_key |
| self.secret_key = secret_key |
| self.paper = paper |
| self.buffer = buffer or LiveDataBuffer() |
| self._running = False |
| self._callbacks = [] |
| |
| def subscribe(self, tickers): |
| self._running = True |
| url = "wss://paper-api.alpaca.markets/stream" if self.paper else "wss://data.alpaca.markets/stream" |
| |
| def on_message(ws, message): |
| data = json.loads(message) |
| if data.get('T') in ('b', 't'): |
| self.buffer.append( |
| ticker=data['S'], timestamp=pd.Timestamp(data['t']), |
| open_p=data.get('o', data.get('p', 0)), |
| high=data.get('h', data.get('p', 0)), |
| low=data.get('l', data.get('p', 0)), |
| close=data.get('c', data.get('p', 0)), |
| volume=data.get('v', data.get('s', 0)), |
| vwap=data.get('vw', data.get('p', 0)) |
| ) |
| for cb in self._callbacks: |
| cb(data) |
| |
| def on_open(ws): |
| ws.send(json.dumps({"action": "auth", "key": self.api_key, "secret": self.secret_key})) |
| time.sleep(1) |
| for t in tickers: |
| ws.send(json.dumps({"action": "subscribe", "bars": [t]})) |
| |
| import websocket |
| self._ws = websocket.WebSocketApp(url, on_open=on_open, on_message=on_message) |
| self._thread = threading.Thread(target=self._ws.run_forever, daemon=True) |
| self._thread.start() |
| |
| def on_data(self, callback): |
| self._callbacks.append(callback) |
| |
| def unsubscribe(self): |
| self._running = False |
| if hasattr(self, '_ws'): |
| self._ws.close() |
|
|
|
|
| class NewsStreamAggregator: |
| """Aggregate news from Yahoo Finance + FMP + custom RSS""" |
| |
| def __init__(self, tickers, sentiment_model=None): |
| self.tickers = tickers |
| self.sentiment_model = sentiment_model |
| self.news_buffer = deque(maxlen=5000) |
| self._running = False |
| |
| def fetch_yahoo_news(self, ticker): |
| import yfinance as yf |
| try: |
| tk = yf.Ticker(ticker) |
| news = tk.news or [] |
| articles = [] |
| for item in news: |
| articles.append({ |
| 'ticker': ticker, |
| 'timestamp': pd.Timestamp(item.get('publish_time', 0), unit='s') if 'publish_time' in item else pd.Timestamp.now(), |
| 'title': item.get('title', ''), |
| 'source': item.get('publisher', 'yahoo'), |
| 'text': item.get('title', ''), |
| }) |
| return articles |
| except Exception as e: |
| return [] |
| |
| def fetch_fmp_news(self, api_key='demo'): |
| import requests |
| try: |
| url = "https://financialmodelingprep.com/api/v3/stock_news" |
| resp = requests.get(url, params={'tickers': ','.join(self.tickers), 'limit': 100, 'apikey': api_key}, timeout=10) |
| if resp.status_code == 200: |
| articles = [] |
| for item in resp.json(): |
| articles.append({ |
| 'ticker': item.get('symbol', ''), |
| 'timestamp': pd.Timestamp(item.get('publishedDate', '')), |
| 'title': item.get('title', ''), |
| 'source': 'fmp', |
| 'text': item.get('text', item.get('title', '')), |
| }) |
| return articles |
| except: |
| pass |
| return [] |
| |
| def process_sentiment(self, articles): |
| df = pd.DataFrame(articles) |
| if len(df) == 0 or self.sentiment_model is None: |
| if len(df) > 0: |
| df['sentiment_score'] = 0.0 |
| return df |
| texts = (df['title'].fillna('') + ' ' + df.get('text', pd.Series(['']*len(df))).fillna('')).tolist() |
| sentiments = self.sentiment_model.analyze_batch(texts) |
| df['sentiment_score'] = [s['sentiment_score'] for s in sentiments] |
| df['sentiment_label'] = [s['label'] for s in sentiments] |
| return df |
| |
| def start_streaming(self, poll_seconds=300): |
| self._running = True |
| def poll(): |
| while self._running: |
| all_articles = [] |
| for t in self.tickers: |
| all_articles.extend(self.fetch_yahoo_news(t)) |
| all_articles.extend(self.fetch_fmp_news()) |
| if all_articles: |
| df = self.process_sentiment(all_articles) |
| for _, row in df.iterrows(): |
| self.news_buffer.append(row.to_dict()) |
| time.sleep(poll_seconds) |
| self._thread = threading.Thread(target=poll, daemon=True) |
| self._thread.start() |
| |
| def stop(self): |
| self._running = False |
| |
| def get_latest_sentiment(self, ticker, hours=24): |
| if len(self.news_buffer) == 0: |
| return pd.DataFrame() |
| df = pd.DataFrame(list(self.news_buffer)) |
| if 'ticker' not in df.columns: |
| return df |
| cutoff = pd.Timestamp.now() - timedelta(hours=hours) |
| return df[(df['ticker'] == ticker) & (df['timestamp'] > cutoff)] |
|
|
|
|
| class OrderFlowEstimator: |
| """Estimate order flow imbalance from tick data""" |
| |
| def __init__(self, lookback=1000): |
| self.lookback = lookback |
| self._trades = {} |
| |
| def process_trade(self, ticker, price, volume, timestamp=None, side='auto'): |
| if ticker not in self._trades: |
| self._trades[ticker] = deque(maxlen=self.lookback) |
| if side == 'auto' and len(self._trades[ticker]) > 0: |
| last_price = self._trades[ticker][-1]['price'] |
| side = 'buy' if price > last_price else ('sell' if price < last_price else 'neutral') |
| self._trades[ticker].append({'price': price, 'volume': volume, 'timestamp': timestamp or pd.Timestamp.now(), 'side': side}) |
| |
| def get_imbalance(self, ticker, window=100): |
| if ticker not in self._trades: |
| return {'ofi': 0, 'buy_volume': 0, 'sell_volume': 0} |
| df = pd.DataFrame(list(self._trades[ticker])[-window:]) |
| buy_vol = df[df['side'] == 'buy']['volume'].sum() |
| sell_vol = df[df['side'] == 'sell']['volume'].sum() |
| total = buy_vol + sell_vol |
| ofi = (buy_vol - sell_vol) / total if total > 0 else 0 |
| return {'ofi': ofi, 'buy_volume': int(buy_vol), 'sell_volume': int(sell_vol)} |
|
|
|
|
| class RealtimeFeatureEngine: |
| """Real-time feature computation pipeline""" |
| |
| def __init__(self, tickers, data_source='yahoo', api_key='', secret_key='', |
| include_sentiment=True, sentiment_model=None): |
| self.tickers = tickers |
| self.buffer = LiveDataBuffer(max_len=50000) |
| self.order_flow = OrderFlowEstimator() |
| self.news = NewsStreamAggregator(tickers, sentiment_model) |
| self._source = data_source |
| self._api_key = api_key |
| self._secret_key = secret_key |
| |
| def start(self, interval='1m', poll_seconds=60): |
| self.news.start_streaming() |
| if self._source == 'alpaca': |
| self.streamer = AlpacaStreamer(self._api_key, self._secret_key, self.buffer) |
| self.streamer.subscribe(self.tickers) |
| else: |
| import yfinance as yf |
| def poll(): |
| while True: |
| for t in self.tickers: |
| try: |
| hist = yf.Ticker(t).history(period='1d', interval=interval) |
| for idx, row in hist.iterrows(): |
| self.buffer.append(t, idx, row['Open'], row['High'], row['Low'], row['Close'], row['Volume']) |
| except: |
| pass |
| time.sleep(poll_seconds) |
| self._thread = threading.Thread(target=poll, daemon=True) |
| self._thread.start() |
| |
| def stop(self): |
| if hasattr(self, 'streamer') and hasattr(self.streamer, 'unsubscribe'): |
| self.streamer.unsubscribe() |
| self.news.stop() |
| |
| def get_latest(self, ticker, lookback=60): |
| return self.buffer.get_dataframe(ticker, last_n=lookback) |
| |
| def get_sentiment(self, ticker): |
| return self.news.get_latest_sentiment(ticker) |