import asyncio import json import websockets import logging from dotenv import load_dotenv import os import yaml import time from collections import defaultdict load_dotenv() logger = logging.getLogger(__name__) class BybitWebSocket: def __init__(self, callback=None): self.settings = yaml.safe_load(open("config/settings.yaml")) TESTNET = os.getenv("BYBIT_TESTNET", "true").lower() == "true" self.ws_url = "wss://stream-testnet.bybit.com/v5/public/linear" if TESTNET else "wss://stream.bybit.com/v5/public/linear" self.private_ws_url = "wss://stream-testnet.bybit.com/v5/private" if TESTNET else "wss://stream.bybit.com/v5/private" self.callback = callback self.subscriptions = set() self.data_buffers = defaultdict(list) self.websocket = None self.private_websocket = None self.is_connected = False self.is_private_connected = False self.reconnect_delay = 1 self.max_reconnect_delay = 60 self.heartbeat_interval = 20 self.latest_prices = {} self.orderbooks = {} self.trades = defaultdict(list) self.tickers = {} async def connect(self): while True: try: logger.info(f"Connecting to Bybit WebSocket: {self.ws_url}") self.websocket = await websockets.connect(self.ws_url) self.is_connected = True logger.info("WebSocket connected successfully") asyncio.create_task(self._heartbeat()) if self.subscriptions: await self._resubscribe() await self._message_handler() except Exception as e: logger.error(f"WebSocket connection error: {e}") self.is_connected = False await self._reconnect() async def connect_private(self): API_KEY = os.getenv("BYBIT_API_KEY") API_SECRET = os.getenv("BYBIT_API_SECRET") if not API_KEY or not API_SECRET: logger.warning("Private WebSocket not available - API keys missing") return while True: try: logger.info(f"Connecting to private WebSocket: {self.private_ws_url}") self.private_websocket = await websockets.connect(self.private_ws_url) self.is_private_connected = True logger.info("Private WebSocket connected successfully") await self._authenticate_private() await self._subscribe_private() await self._private_message_handler() except Exception as e: logger.error(f"Private WebSocket connection error: {e}") self.is_private_connected = False await asyncio.sleep(self.reconnect_delay) continue async def _authenticate_private(self): pass async def _subscribe_private(self): subscriptions = [ {"op": "subscribe", "args": ["position.linear"]}, {"op": "subscribe", "args": ["order.linear"]}, {"op": "subscribe", "args": ["wallet"]} ] for sub in subscriptions: await self.private_websocket.send(json.dumps(sub)) logger.info(f"Subscribed to private channel: {sub}") async def subscribe_ticker(self, symbols): if isinstance(symbols, str): symbols = [symbols] args = [f"tickers.{symbol}" for symbol in symbols] subscription = {"op": "subscribe", "args": args} await self.websocket.send(json.dumps(subscription)) self.subscriptions.add(("ticker", tuple(symbols))) logger.info(f"Subscribed to tickers: {symbols}") async def subscribe_kline(self, symbols, intervals=None): if isinstance(symbols, str): symbols = [symbols] if intervals is None: intervals = ["1", "5"] args = [] for symbol in symbols: for interval in intervals: args.append(f"kline.{interval}.{symbol}") subscription = {"op": "subscribe", "args": args} await self.websocket.send(json.dumps(subscription)) self.subscriptions.add(("kline", tuple(symbols), tuple(intervals))) logger.info(f"Subscribed to klines: {symbols} - {intervals}") async def subscribe_orderbook(self, symbols, depth=25): if isinstance(symbols, str): symbols = [symbols] args = [f"orderbook.{depth}.{symbol}" for symbol in symbols] subscription = {"op": "subscribe", "args": args} await self.websocket.send(json.dumps(subscription)) self.subscriptions.add(("orderbook", tuple(symbols), depth)) logger.info(f"Subscribed to orderbook: {symbols} depth={depth}") async def subscribe_trades(self, symbols): if isinstance(symbols, str): symbols = [symbols] args = [f"publicTrade.{symbol}" for symbol in symbols] subscription = {"op": "subscribe", "args": args} await self.websocket.send(json.dumps(subscription)) self.subscriptions.add(("trades", tuple(symbols))) logger.info(f"Subscribed to trades: {symbols}") async def _resubscribe(self): for sub in self.subscriptions: sub_type = sub[0] if sub_type == "ticker": await self.subscribe_ticker(list(sub[1])) elif sub_type == "kline": await self.subscribe_kline(list(sub[1]), list(sub[2])) elif sub_type == "orderbook": await self.subscribe_orderbook(list(sub[1]), sub[2]) elif sub_type == "trades": await self.subscribe_trades(list(sub[1])) async def _heartbeat(self): while self.is_connected: try: ping_msg = {"op": "ping"} await self.websocket.send(json.dumps(ping_msg)) await asyncio.sleep(self.heartbeat_interval) except Exception as e: logger.error(f"Heartbeat error: {e}") break async def _message_handler(self): try: async for message in self.websocket: try: data = json.loads(message) if "topic" in data: await self._process_message(data) elif data.get("op") == "pong": pass elif "success" in data: if data["success"]: logger.info("Subscription successful") else: logger.error(f"Subscription failed: {data}") except json.JSONDecodeError as e: logger.error(f"Invalid JSON received: {e}") except Exception as e: logger.error(f"Message processing error: {e}") except websockets.exceptions.ConnectionClosed: logger.warning("WebSocket connection closed") self.is_connected = False async def _private_message_handler(self): try: async for message in self.private_websocket: try: data = json.loads(message) await self._process_private_message(data) except Exception as e: logger.error(f"Private message processing error: {e}") except websockets.exceptions.ConnectionClosed: logger.warning("Private WebSocket connection closed") self.is_private_connected = False async def _process_message(self, data): topic = data["topic"] payload = data["data"] if topic.startswith("tickers."): symbol = topic.split(".")[1] self.tickers[symbol] = payload self.latest_prices[symbol] = float(payload["lastPrice"]) elif topic.startswith("orderbook."): symbol = topic.split(".")[2] self.orderbooks[symbol] = payload elif topic.startswith("publicTrade."): symbol = topic.split(".")[1] self.trades[symbol].extend(payload) self.trades[symbol] = self.trades[symbol][-100:] elif topic.startswith("kline."): parts = topic.split(".") interval = parts[1] symbol = parts[2] key = f"{symbol}_{interval}" self.data_buffers[key].extend(payload) self.data_buffers[key] = self.data_buffers[key][-200:] if self.callback: await self.callback(data) async def _process_private_message(self, data): if self.callback: await self.callback(data) async def _reconnect(self): self.reconnect_delay = min(self.reconnect_delay * 2, self.max_reconnect_delay) logger.info(f"Reconnecting in {self.reconnect_delay} seconds...") await asyncio.sleep(self.reconnect_delay) def get_latest_price(self, symbol): return self.latest_prices.get(symbol) def get_orderbook(self, symbol): return self.orderbooks.get(symbol) def get_recent_trades(self, symbol, limit=10): return self.trades.get(symbol, [])[-limit:] def get_kline_buffer(self, symbol, interval="1", limit=100): key = f"{symbol}_{interval}" return self.data_buffers.get(key, [])[-limit:] async def close(self): if self.websocket: await self.websocket.close() if self.private_websocket: await self.private_websocket.close() self.is_connected = False self.is_private_connected = False logger.info("WebSocket connections closed")