alphaforge-quant-system / realtime_data.py
Premchan369's picture
Upload realtime_data.py
90db8a6 verified
raw
history blame
10 kB
"""Real-Time Data Streaming - WebSocket feeds and live processing"""
import numpy as np
import pandas as pd
import threading
import time
import json
from typing import Dict, List, Optional, Callable
from collections import deque
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')
class LiveDataBuffer:
"""Thread-safe circular buffer for real-time market data"""
def __init__(self, max_len=10000):
self.max_len = max_len
self._data = {}
self._lock = threading.Lock()
def append(self, ticker, timestamp, open_p, high, low, close, volume, **kwargs):
with self._lock:
if ticker not in self._data:
self._data[ticker] = deque(maxlen=self.max_len)
self._data[ticker].append({
'timestamp': timestamp, 'Open': open_p, 'High': high,
'Low': low, 'Close': close, 'Volume': volume, **kwargs
})
def get_dataframe(self, ticker, last_n=None):
with self._lock:
if ticker not in self._data:
return pd.DataFrame()
data = list(self._data[ticker])
if last_n:
data = data[-last_n:]
return pd.DataFrame(data).set_index('timestamp')
def latest(self, ticker):
with self._lock:
if ticker not in self._data or len(self._data[ticker]) == 0:
return None
return self._data[ticker][-1]
class AlpacaStreamer:
"""Alpaca Markets WebSocket - free real-time IEX data"""
def __init__(self, api_key, secret_key, buffer=None, paper=True):
self.api_key = api_key
self.secret_key = secret_key
self.paper = paper
self.buffer = buffer or LiveDataBuffer()
self._running = False
self._callbacks = []
def subscribe(self, tickers):
self._running = True
url = "wss://paper-api.alpaca.markets/stream" if self.paper else "wss://data.alpaca.markets/stream"
def on_message(ws, message):
data = json.loads(message)
if data.get('T') in ('b', 't'):
self.buffer.append(
ticker=data['S'], timestamp=pd.Timestamp(data['t']),
open_p=data.get('o', data.get('p', 0)),
high=data.get('h', data.get('p', 0)),
low=data.get('l', data.get('p', 0)),
close=data.get('c', data.get('p', 0)),
volume=data.get('v', data.get('s', 0)),
vwap=data.get('vw', data.get('p', 0))
)
for cb in self._callbacks:
cb(data)
def on_open(ws):
ws.send(json.dumps({"action": "auth", "key": self.api_key, "secret": self.secret_key}))
time.sleep(1)
for t in tickers:
ws.send(json.dumps({"action": "subscribe", "bars": [t]}))
import websocket
self._ws = websocket.WebSocketApp(url, on_open=on_open, on_message=on_message)
self._thread = threading.Thread(target=self._ws.run_forever, daemon=True)
self._thread.start()
def on_data(self, callback):
self._callbacks.append(callback)
def unsubscribe(self):
self._running = False
if hasattr(self, '_ws'):
self._ws.close()
class NewsStreamAggregator:
"""Aggregate news from Yahoo Finance + FMP + custom RSS"""
def __init__(self, tickers, sentiment_model=None):
self.tickers = tickers
self.sentiment_model = sentiment_model
self.news_buffer = deque(maxlen=5000)
self._running = False
def fetch_yahoo_news(self, ticker):
import yfinance as yf
try:
tk = yf.Ticker(ticker)
news = tk.news or []
articles = []
for item in news:
articles.append({
'ticker': ticker,
'timestamp': pd.Timestamp(item.get('publish_time', 0), unit='s') if 'publish_time' in item else pd.Timestamp.now(),
'title': item.get('title', ''),
'source': item.get('publisher', 'yahoo'),
'text': item.get('title', ''),
})
return articles
except Exception as e:
return []
def fetch_fmp_news(self, api_key='demo'):
import requests
try:
url = "https://financialmodelingprep.com/api/v3/stock_news"
resp = requests.get(url, params={'tickers': ','.join(self.tickers), 'limit': 100, 'apikey': api_key}, timeout=10)
if resp.status_code == 200:
articles = []
for item in resp.json():
articles.append({
'ticker': item.get('symbol', ''),
'timestamp': pd.Timestamp(item.get('publishedDate', '')),
'title': item.get('title', ''),
'source': 'fmp',
'text': item.get('text', item.get('title', '')),
})
return articles
except:
pass
return []
def process_sentiment(self, articles):
df = pd.DataFrame(articles)
if len(df) == 0 or self.sentiment_model is None:
if len(df) > 0:
df['sentiment_score'] = 0.0
return df
texts = (df['title'].fillna('') + ' ' + df.get('text', pd.Series(['']*len(df))).fillna('')).tolist()
sentiments = self.sentiment_model.analyze_batch(texts)
df['sentiment_score'] = [s['sentiment_score'] for s in sentiments]
df['sentiment_label'] = [s['label'] for s in sentiments]
return df
def start_streaming(self, poll_seconds=300):
self._running = True
def poll():
while self._running:
all_articles = []
for t in self.tickers:
all_articles.extend(self.fetch_yahoo_news(t))
all_articles.extend(self.fetch_fmp_news())
if all_articles:
df = self.process_sentiment(all_articles)
for _, row in df.iterrows():
self.news_buffer.append(row.to_dict())
time.sleep(poll_seconds)
self._thread = threading.Thread(target=poll, daemon=True)
self._thread.start()
def stop(self):
self._running = False
def get_latest_sentiment(self, ticker, hours=24):
if len(self.news_buffer) == 0:
return pd.DataFrame()
df = pd.DataFrame(list(self.news_buffer))
if 'ticker' not in df.columns:
return df
cutoff = pd.Timestamp.now() - timedelta(hours=hours)
return df[(df['ticker'] == ticker) & (df['timestamp'] > cutoff)]
class OrderFlowEstimator:
"""Estimate order flow imbalance from tick data"""
def __init__(self, lookback=1000):
self.lookback = lookback
self._trades = {}
def process_trade(self, ticker, price, volume, timestamp=None, side='auto'):
if ticker not in self._trades:
self._trades[ticker] = deque(maxlen=self.lookback)
if side == 'auto' and len(self._trades[ticker]) > 0:
last_price = self._trades[ticker][-1]['price']
side = 'buy' if price > last_price else ('sell' if price < last_price else 'neutral')
self._trades[ticker].append({'price': price, 'volume': volume, 'timestamp': timestamp or pd.Timestamp.now(), 'side': side})
def get_imbalance(self, ticker, window=100):
if ticker not in self._trades:
return {'ofi': 0, 'buy_volume': 0, 'sell_volume': 0}
df = pd.DataFrame(list(self._trades[ticker])[-window:])
buy_vol = df[df['side'] == 'buy']['volume'].sum()
sell_vol = df[df['side'] == 'sell']['volume'].sum()
total = buy_vol + sell_vol
ofi = (buy_vol - sell_vol) / total if total > 0 else 0
return {'ofi': ofi, 'buy_volume': int(buy_vol), 'sell_volume': int(sell_vol)}
class RealtimeFeatureEngine:
"""Real-time feature computation pipeline"""
def __init__(self, tickers, data_source='yahoo', api_key='', secret_key='',
include_sentiment=True, sentiment_model=None):
self.tickers = tickers
self.buffer = LiveDataBuffer(max_len=50000)
self.order_flow = OrderFlowEstimator()
self.news = NewsStreamAggregator(tickers, sentiment_model)
self._source = data_source
self._api_key = api_key
self._secret_key = secret_key
def start(self, interval='1m', poll_seconds=60):
self.news.start_streaming()
if self._source == 'alpaca':
self.streamer = AlpacaStreamer(self._api_key, self._secret_key, self.buffer)
self.streamer.subscribe(self.tickers)
else:
import yfinance as yf
def poll():
while True:
for t in self.tickers:
try:
hist = yf.Ticker(t).history(period='1d', interval=interval)
for idx, row in hist.iterrows():
self.buffer.append(t, idx, row['Open'], row['High'], row['Low'], row['Close'], row['Volume'])
except:
pass
time.sleep(poll_seconds)
self._thread = threading.Thread(target=poll, daemon=True)
self._thread.start()
def stop(self):
if hasattr(self, 'streamer') and hasattr(self.streamer, 'unsubscribe'):
self.streamer.unsubscribe()
self.news.stop()
def get_latest(self, ticker, lookback=60):
return self.buffer.get_dataframe(ticker, last_n=lookback)
def get_sentiment(self, ticker):
return self.news.get_latest_sentiment(ticker)