Premchan369 commited on
Commit
90db8a6
·
verified ·
1 Parent(s): 46cedba

Upload realtime_data.py

Browse files
Files changed (1) hide show
  1. realtime_data.py +253 -0
realtime_data.py ADDED
@@ -0,0 +1,253 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Real-Time Data Streaming - WebSocket feeds and live processing"""
2
+ import numpy as np
3
+ import pandas as pd
4
+ import threading
5
+ import time
6
+ import json
7
+ from typing import Dict, List, Optional, Callable
8
+ from collections import deque
9
+ from datetime import datetime, timedelta
10
+ import warnings
11
+ warnings.filterwarnings('ignore')
12
+
13
+
14
+ class LiveDataBuffer:
15
+ """Thread-safe circular buffer for real-time market data"""
16
+
17
+ def __init__(self, max_len=10000):
18
+ self.max_len = max_len
19
+ self._data = {}
20
+ self._lock = threading.Lock()
21
+
22
+ def append(self, ticker, timestamp, open_p, high, low, close, volume, **kwargs):
23
+ with self._lock:
24
+ if ticker not in self._data:
25
+ self._data[ticker] = deque(maxlen=self.max_len)
26
+ self._data[ticker].append({
27
+ 'timestamp': timestamp, 'Open': open_p, 'High': high,
28
+ 'Low': low, 'Close': close, 'Volume': volume, **kwargs
29
+ })
30
+
31
+ def get_dataframe(self, ticker, last_n=None):
32
+ with self._lock:
33
+ if ticker not in self._data:
34
+ return pd.DataFrame()
35
+ data = list(self._data[ticker])
36
+ if last_n:
37
+ data = data[-last_n:]
38
+ return pd.DataFrame(data).set_index('timestamp')
39
+
40
+ def latest(self, ticker):
41
+ with self._lock:
42
+ if ticker not in self._data or len(self._data[ticker]) == 0:
43
+ return None
44
+ return self._data[ticker][-1]
45
+
46
+
47
+ class AlpacaStreamer:
48
+ """Alpaca Markets WebSocket - free real-time IEX data"""
49
+
50
+ def __init__(self, api_key, secret_key, buffer=None, paper=True):
51
+ self.api_key = api_key
52
+ self.secret_key = secret_key
53
+ self.paper = paper
54
+ self.buffer = buffer or LiveDataBuffer()
55
+ self._running = False
56
+ self._callbacks = []
57
+
58
+ def subscribe(self, tickers):
59
+ self._running = True
60
+ url = "wss://paper-api.alpaca.markets/stream" if self.paper else "wss://data.alpaca.markets/stream"
61
+
62
+ def on_message(ws, message):
63
+ data = json.loads(message)
64
+ if data.get('T') in ('b', 't'):
65
+ self.buffer.append(
66
+ ticker=data['S'], timestamp=pd.Timestamp(data['t']),
67
+ open_p=data.get('o', data.get('p', 0)),
68
+ high=data.get('h', data.get('p', 0)),
69
+ low=data.get('l', data.get('p', 0)),
70
+ close=data.get('c', data.get('p', 0)),
71
+ volume=data.get('v', data.get('s', 0)),
72
+ vwap=data.get('vw', data.get('p', 0))
73
+ )
74
+ for cb in self._callbacks:
75
+ cb(data)
76
+
77
+ def on_open(ws):
78
+ ws.send(json.dumps({"action": "auth", "key": self.api_key, "secret": self.secret_key}))
79
+ time.sleep(1)
80
+ for t in tickers:
81
+ ws.send(json.dumps({"action": "subscribe", "bars": [t]}))
82
+
83
+ import websocket
84
+ self._ws = websocket.WebSocketApp(url, on_open=on_open, on_message=on_message)
85
+ self._thread = threading.Thread(target=self._ws.run_forever, daemon=True)
86
+ self._thread.start()
87
+
88
+ def on_data(self, callback):
89
+ self._callbacks.append(callback)
90
+
91
+ def unsubscribe(self):
92
+ self._running = False
93
+ if hasattr(self, '_ws'):
94
+ self._ws.close()
95
+
96
+
97
+ class NewsStreamAggregator:
98
+ """Aggregate news from Yahoo Finance + FMP + custom RSS"""
99
+
100
+ def __init__(self, tickers, sentiment_model=None):
101
+ self.tickers = tickers
102
+ self.sentiment_model = sentiment_model
103
+ self.news_buffer = deque(maxlen=5000)
104
+ self._running = False
105
+
106
+ def fetch_yahoo_news(self, ticker):
107
+ import yfinance as yf
108
+ try:
109
+ tk = yf.Ticker(ticker)
110
+ news = tk.news or []
111
+ articles = []
112
+ for item in news:
113
+ articles.append({
114
+ 'ticker': ticker,
115
+ 'timestamp': pd.Timestamp(item.get('publish_time', 0), unit='s') if 'publish_time' in item else pd.Timestamp.now(),
116
+ 'title': item.get('title', ''),
117
+ 'source': item.get('publisher', 'yahoo'),
118
+ 'text': item.get('title', ''),
119
+ })
120
+ return articles
121
+ except Exception as e:
122
+ return []
123
+
124
+ def fetch_fmp_news(self, api_key='demo'):
125
+ import requests
126
+ try:
127
+ url = "https://financialmodelingprep.com/api/v3/stock_news"
128
+ resp = requests.get(url, params={'tickers': ','.join(self.tickers), 'limit': 100, 'apikey': api_key}, timeout=10)
129
+ if resp.status_code == 200:
130
+ articles = []
131
+ for item in resp.json():
132
+ articles.append({
133
+ 'ticker': item.get('symbol', ''),
134
+ 'timestamp': pd.Timestamp(item.get('publishedDate', '')),
135
+ 'title': item.get('title', ''),
136
+ 'source': 'fmp',
137
+ 'text': item.get('text', item.get('title', '')),
138
+ })
139
+ return articles
140
+ except:
141
+ pass
142
+ return []
143
+
144
+ def process_sentiment(self, articles):
145
+ df = pd.DataFrame(articles)
146
+ if len(df) == 0 or self.sentiment_model is None:
147
+ if len(df) > 0:
148
+ df['sentiment_score'] = 0.0
149
+ return df
150
+ texts = (df['title'].fillna('') + ' ' + df.get('text', pd.Series(['']*len(df))).fillna('')).tolist()
151
+ sentiments = self.sentiment_model.analyze_batch(texts)
152
+ df['sentiment_score'] = [s['sentiment_score'] for s in sentiments]
153
+ df['sentiment_label'] = [s['label'] for s in sentiments]
154
+ return df
155
+
156
+ def start_streaming(self, poll_seconds=300):
157
+ self._running = True
158
+ def poll():
159
+ while self._running:
160
+ all_articles = []
161
+ for t in self.tickers:
162
+ all_articles.extend(self.fetch_yahoo_news(t))
163
+ all_articles.extend(self.fetch_fmp_news())
164
+ if all_articles:
165
+ df = self.process_sentiment(all_articles)
166
+ for _, row in df.iterrows():
167
+ self.news_buffer.append(row.to_dict())
168
+ time.sleep(poll_seconds)
169
+ self._thread = threading.Thread(target=poll, daemon=True)
170
+ self._thread.start()
171
+
172
+ def stop(self):
173
+ self._running = False
174
+
175
+ def get_latest_sentiment(self, ticker, hours=24):
176
+ if len(self.news_buffer) == 0:
177
+ return pd.DataFrame()
178
+ df = pd.DataFrame(list(self.news_buffer))
179
+ if 'ticker' not in df.columns:
180
+ return df
181
+ cutoff = pd.Timestamp.now() - timedelta(hours=hours)
182
+ return df[(df['ticker'] == ticker) & (df['timestamp'] > cutoff)]
183
+
184
+
185
+ class OrderFlowEstimator:
186
+ """Estimate order flow imbalance from tick data"""
187
+
188
+ def __init__(self, lookback=1000):
189
+ self.lookback = lookback
190
+ self._trades = {}
191
+
192
+ def process_trade(self, ticker, price, volume, timestamp=None, side='auto'):
193
+ if ticker not in self._trades:
194
+ self._trades[ticker] = deque(maxlen=self.lookback)
195
+ if side == 'auto' and len(self._trades[ticker]) > 0:
196
+ last_price = self._trades[ticker][-1]['price']
197
+ side = 'buy' if price > last_price else ('sell' if price < last_price else 'neutral')
198
+ self._trades[ticker].append({'price': price, 'volume': volume, 'timestamp': timestamp or pd.Timestamp.now(), 'side': side})
199
+
200
+ def get_imbalance(self, ticker, window=100):
201
+ if ticker not in self._trades:
202
+ return {'ofi': 0, 'buy_volume': 0, 'sell_volume': 0}
203
+ df = pd.DataFrame(list(self._trades[ticker])[-window:])
204
+ buy_vol = df[df['side'] == 'buy']['volume'].sum()
205
+ sell_vol = df[df['side'] == 'sell']['volume'].sum()
206
+ total = buy_vol + sell_vol
207
+ ofi = (buy_vol - sell_vol) / total if total > 0 else 0
208
+ return {'ofi': ofi, 'buy_volume': int(buy_vol), 'sell_volume': int(sell_vol)}
209
+
210
+
211
+ class RealtimeFeatureEngine:
212
+ """Real-time feature computation pipeline"""
213
+
214
+ def __init__(self, tickers, data_source='yahoo', api_key='', secret_key='',
215
+ include_sentiment=True, sentiment_model=None):
216
+ self.tickers = tickers
217
+ self.buffer = LiveDataBuffer(max_len=50000)
218
+ self.order_flow = OrderFlowEstimator()
219
+ self.news = NewsStreamAggregator(tickers, sentiment_model)
220
+ self._source = data_source
221
+ self._api_key = api_key
222
+ self._secret_key = secret_key
223
+
224
+ def start(self, interval='1m', poll_seconds=60):
225
+ self.news.start_streaming()
226
+ if self._source == 'alpaca':
227
+ self.streamer = AlpacaStreamer(self._api_key, self._secret_key, self.buffer)
228
+ self.streamer.subscribe(self.tickers)
229
+ else:
230
+ import yfinance as yf
231
+ def poll():
232
+ while True:
233
+ for t in self.tickers:
234
+ try:
235
+ hist = yf.Ticker(t).history(period='1d', interval=interval)
236
+ for idx, row in hist.iterrows():
237
+ self.buffer.append(t, idx, row['Open'], row['High'], row['Low'], row['Close'], row['Volume'])
238
+ except:
239
+ pass
240
+ time.sleep(poll_seconds)
241
+ self._thread = threading.Thread(target=poll, daemon=True)
242
+ self._thread.start()
243
+
244
+ def stop(self):
245
+ if hasattr(self, 'streamer') and hasattr(self.streamer, 'unsubscribe'):
246
+ self.streamer.unsubscribe()
247
+ self.news.stop()
248
+
249
+ def get_latest(self, ticker, lookback=60):
250
+ return self.buffer.get_dataframe(ticker, last_n=lookback)
251
+
252
+ def get_sentiment(self, ticker):
253
+ return self.news.get_latest_sentiment(ticker)