Spaces:
Paused
Paused
| import asyncio | |
| import json | |
| import websockets | |
| import logging | |
| from dotenv import load_dotenv | |
| import os | |
| import yaml | |
| import time | |
| from collections import defaultdict | |
| load_dotenv() | |
| logger = logging.getLogger(__name__) | |
| class BybitWebSocket: | |
| def __init__(self, callback=None): | |
| self.settings = yaml.safe_load(open("config/settings.yaml")) | |
| TESTNET = os.getenv("BYBIT_TESTNET", "true").lower() == "true" | |
| self.ws_url = "wss://stream-testnet.bybit.com/v5/public/linear" if TESTNET else "wss://stream.bybit.com/v5/public/linear" | |
| self.private_ws_url = "wss://stream-testnet.bybit.com/v5/private" if TESTNET else "wss://stream.bybit.com/v5/private" | |
| self.callback = callback | |
| self.subscriptions = set() | |
| self.data_buffers = defaultdict(list) | |
| self.websocket = None | |
| self.private_websocket = None | |
| self.is_connected = False | |
| self.is_private_connected = False | |
| self.reconnect_delay = 1 | |
| self.max_reconnect_delay = 60 | |
| self.heartbeat_interval = 20 | |
| self.latest_prices = {} | |
| self.orderbooks = {} | |
| self.trades = defaultdict(list) | |
| self.tickers = {} | |
| async def connect(self): | |
| while True: | |
| try: | |
| logger.info(f"Connecting to Bybit WebSocket: {self.ws_url}") | |
| self.websocket = await websockets.connect(self.ws_url) | |
| self.is_connected = True | |
| logger.info("WebSocket connected successfully") | |
| asyncio.create_task(self._heartbeat()) | |
| if self.subscriptions: | |
| await self._resubscribe() | |
| await self._message_handler() | |
| except Exception as e: | |
| logger.error(f"WebSocket connection error: {e}") | |
| self.is_connected = False | |
| await self._reconnect() | |
| async def connect_private(self): | |
| API_KEY = os.getenv("BYBIT_API_KEY") | |
| API_SECRET = os.getenv("BYBIT_API_SECRET") | |
| if not API_KEY or not API_SECRET: | |
| logger.warning("Private WebSocket not available - API keys missing") | |
| return | |
| while True: | |
| try: | |
| logger.info(f"Connecting to private WebSocket: {self.private_ws_url}") | |
| self.private_websocket = await websockets.connect(self.private_ws_url) | |
| self.is_private_connected = True | |
| logger.info("Private WebSocket connected successfully") | |
| await self._authenticate_private() | |
| await self._subscribe_private() | |
| await self._private_message_handler() | |
| except Exception as e: | |
| logger.error(f"Private WebSocket connection error: {e}") | |
| self.is_private_connected = False | |
| await asyncio.sleep(self.reconnect_delay) | |
| continue | |
| async def _authenticate_private(self): | |
| pass | |
| async def _subscribe_private(self): | |
| subscriptions = [ | |
| {"op": "subscribe", "args": ["position.linear"]}, | |
| {"op": "subscribe", "args": ["order.linear"]}, | |
| {"op": "subscribe", "args": ["wallet"]} | |
| ] | |
| for sub in subscriptions: | |
| await self.private_websocket.send(json.dumps(sub)) | |
| logger.info(f"Subscribed to private channel: {sub}") | |
| async def subscribe_ticker(self, symbols): | |
| if isinstance(symbols, str): | |
| symbols = [symbols] | |
| args = [f"tickers.{symbol}" for symbol in symbols] | |
| subscription = {"op": "subscribe", "args": args} | |
| await self.websocket.send(json.dumps(subscription)) | |
| self.subscriptions.add(("ticker", tuple(symbols))) | |
| logger.info(f"Subscribed to tickers: {symbols}") | |
| async def subscribe_kline(self, symbols, intervals=None): | |
| if isinstance(symbols, str): | |
| symbols = [symbols] | |
| if intervals is None: | |
| intervals = ["1", "5"] | |
| args = [] | |
| for symbol in symbols: | |
| for interval in intervals: | |
| args.append(f"kline.{interval}.{symbol}") | |
| subscription = {"op": "subscribe", "args": args} | |
| await self.websocket.send(json.dumps(subscription)) | |
| self.subscriptions.add(("kline", tuple(symbols), tuple(intervals))) | |
| logger.info(f"Subscribed to klines: {symbols} - {intervals}") | |
| async def subscribe_orderbook(self, symbols, depth=25): | |
| if isinstance(symbols, str): | |
| symbols = [symbols] | |
| args = [f"orderbook.{depth}.{symbol}" for symbol in symbols] | |
| subscription = {"op": "subscribe", "args": args} | |
| await self.websocket.send(json.dumps(subscription)) | |
| self.subscriptions.add(("orderbook", tuple(symbols), depth)) | |
| logger.info(f"Subscribed to orderbook: {symbols} depth={depth}") | |
| async def subscribe_trades(self, symbols): | |
| if isinstance(symbols, str): | |
| symbols = [symbols] | |
| args = [f"publicTrade.{symbol}" for symbol in symbols] | |
| subscription = {"op": "subscribe", "args": args} | |
| await self.websocket.send(json.dumps(subscription)) | |
| self.subscriptions.add(("trades", tuple(symbols))) | |
| logger.info(f"Subscribed to trades: {symbols}") | |
| async def _resubscribe(self): | |
| for sub in self.subscriptions: | |
| sub_type = sub[0] | |
| if sub_type == "ticker": | |
| await self.subscribe_ticker(list(sub[1])) | |
| elif sub_type == "kline": | |
| await self.subscribe_kline(list(sub[1]), list(sub[2])) | |
| elif sub_type == "orderbook": | |
| await self.subscribe_orderbook(list(sub[1]), sub[2]) | |
| elif sub_type == "trades": | |
| await self.subscribe_trades(list(sub[1])) | |
| async def _heartbeat(self): | |
| while self.is_connected: | |
| try: | |
| ping_msg = {"op": "ping"} | |
| await self.websocket.send(json.dumps(ping_msg)) | |
| await asyncio.sleep(self.heartbeat_interval) | |
| except Exception as e: | |
| logger.error(f"Heartbeat error: {e}") | |
| break | |
| async def _message_handler(self): | |
| try: | |
| async for message in self.websocket: | |
| try: | |
| data = json.loads(message) | |
| if "topic" in data: | |
| await self._process_message(data) | |
| elif data.get("op") == "pong": | |
| pass | |
| elif "success" in data: | |
| if data["success"]: | |
| logger.info("Subscription successful") | |
| else: | |
| logger.error(f"Subscription failed: {data}") | |
| except json.JSONDecodeError as e: | |
| logger.error(f"Invalid JSON received: {e}") | |
| except Exception as e: | |
| logger.error(f"Message processing error: {e}") | |
| except websockets.exceptions.ConnectionClosed: | |
| logger.warning("WebSocket connection closed") | |
| self.is_connected = False | |
| async def _private_message_handler(self): | |
| try: | |
| async for message in self.private_websocket: | |
| try: | |
| data = json.loads(message) | |
| await self._process_private_message(data) | |
| except Exception as e: | |
| logger.error(f"Private message processing error: {e}") | |
| except websockets.exceptions.ConnectionClosed: | |
| logger.warning("Private WebSocket connection closed") | |
| self.is_private_connected = False | |
| async def _process_message(self, data): | |
| topic = data["topic"] | |
| payload = data["data"] | |
| if topic.startswith("tickers."): | |
| symbol = topic.split(".")[1] | |
| self.tickers[symbol] = payload | |
| self.latest_prices[symbol] = float(payload["lastPrice"]) | |
| elif topic.startswith("orderbook."): | |
| symbol = topic.split(".")[2] | |
| self.orderbooks[symbol] = payload | |
| elif topic.startswith("publicTrade."): | |
| symbol = topic.split(".")[1] | |
| self.trades[symbol].extend(payload) | |
| self.trades[symbol] = self.trades[symbol][-100:] | |
| elif topic.startswith("kline."): | |
| parts = topic.split(".") | |
| interval = parts[1] | |
| symbol = parts[2] | |
| key = f"{symbol}_{interval}" | |
| self.data_buffers[key].extend(payload) | |
| self.data_buffers[key] = self.data_buffers[key][-200:] | |
| if self.callback: | |
| await self.callback(data) | |
| async def _process_private_message(self, data): | |
| if self.callback: | |
| await self.callback(data) | |
| async def _reconnect(self): | |
| self.reconnect_delay = min(self.reconnect_delay * 2, self.max_reconnect_delay) | |
| logger.info(f"Reconnecting in {self.reconnect_delay} seconds...") | |
| await asyncio.sleep(self.reconnect_delay) | |
| def get_latest_price(self, symbol): | |
| return self.latest_prices.get(symbol) | |
| def get_orderbook(self, symbol): | |
| return self.orderbooks.get(symbol) | |
| def get_recent_trades(self, symbol, limit=10): | |
| return self.trades.get(symbol, [])[-limit:] | |
| def get_kline_buffer(self, symbol, interval="1", limit=100): | |
| key = f"{symbol}_{interval}" | |
| return self.data_buffers.get(key, [])[-limit:] | |
| async def close(self): | |
| if self.websocket: | |
| await self.websocket.close() | |
| if self.private_websocket: | |
| await self.private_websocket.close() | |
| self.is_connected = False | |
| self.is_private_connected = False | |
| logger.info("WebSocket connections closed") | |