scalperBot / core /websockets.py
nexusbert's picture
Upload 36 files
96e0cc2 verified
import asyncio
import json
import websockets
import logging
from dotenv import load_dotenv
import os
import yaml
import time
from collections import defaultdict
load_dotenv()
logger = logging.getLogger(__name__)
class BybitWebSocket:
def __init__(self, callback=None):
self.settings = yaml.safe_load(open("config/settings.yaml"))
TESTNET = os.getenv("BYBIT_TESTNET", "true").lower() == "true"
self.ws_url = "wss://stream-testnet.bybit.com/v5/public/linear" if TESTNET else "wss://stream.bybit.com/v5/public/linear"
self.private_ws_url = "wss://stream-testnet.bybit.com/v5/private" if TESTNET else "wss://stream.bybit.com/v5/private"
self.callback = callback
self.subscriptions = set()
self.data_buffers = defaultdict(list)
self.websocket = None
self.private_websocket = None
self.is_connected = False
self.is_private_connected = False
self.reconnect_delay = 1
self.max_reconnect_delay = 60
self.heartbeat_interval = 20
self.latest_prices = {}
self.orderbooks = {}
self.trades = defaultdict(list)
self.tickers = {}
async def connect(self):
while True:
try:
logger.info(f"Connecting to Bybit WebSocket: {self.ws_url}")
self.websocket = await websockets.connect(self.ws_url)
self.is_connected = True
logger.info("WebSocket connected successfully")
asyncio.create_task(self._heartbeat())
if self.subscriptions:
await self._resubscribe()
await self._message_handler()
except Exception as e:
logger.error(f"WebSocket connection error: {e}")
self.is_connected = False
await self._reconnect()
async def connect_private(self):
API_KEY = os.getenv("BYBIT_API_KEY")
API_SECRET = os.getenv("BYBIT_API_SECRET")
if not API_KEY or not API_SECRET:
logger.warning("Private WebSocket not available - API keys missing")
return
while True:
try:
logger.info(f"Connecting to private WebSocket: {self.private_ws_url}")
self.private_websocket = await websockets.connect(self.private_ws_url)
self.is_private_connected = True
logger.info("Private WebSocket connected successfully")
await self._authenticate_private()
await self._subscribe_private()
await self._private_message_handler()
except Exception as e:
logger.error(f"Private WebSocket connection error: {e}")
self.is_private_connected = False
await asyncio.sleep(self.reconnect_delay)
continue
async def _authenticate_private(self):
pass
async def _subscribe_private(self):
subscriptions = [
{"op": "subscribe", "args": ["position.linear"]},
{"op": "subscribe", "args": ["order.linear"]},
{"op": "subscribe", "args": ["wallet"]}
]
for sub in subscriptions:
await self.private_websocket.send(json.dumps(sub))
logger.info(f"Subscribed to private channel: {sub}")
async def subscribe_ticker(self, symbols):
if isinstance(symbols, str):
symbols = [symbols]
args = [f"tickers.{symbol}" for symbol in symbols]
subscription = {"op": "subscribe", "args": args}
await self.websocket.send(json.dumps(subscription))
self.subscriptions.add(("ticker", tuple(symbols)))
logger.info(f"Subscribed to tickers: {symbols}")
async def subscribe_kline(self, symbols, intervals=None):
if isinstance(symbols, str):
symbols = [symbols]
if intervals is None:
intervals = ["1", "5"]
args = []
for symbol in symbols:
for interval in intervals:
args.append(f"kline.{interval}.{symbol}")
subscription = {"op": "subscribe", "args": args}
await self.websocket.send(json.dumps(subscription))
self.subscriptions.add(("kline", tuple(symbols), tuple(intervals)))
logger.info(f"Subscribed to klines: {symbols} - {intervals}")
async def subscribe_orderbook(self, symbols, depth=25):
if isinstance(symbols, str):
symbols = [symbols]
args = [f"orderbook.{depth}.{symbol}" for symbol in symbols]
subscription = {"op": "subscribe", "args": args}
await self.websocket.send(json.dumps(subscription))
self.subscriptions.add(("orderbook", tuple(symbols), depth))
logger.info(f"Subscribed to orderbook: {symbols} depth={depth}")
async def subscribe_trades(self, symbols):
if isinstance(symbols, str):
symbols = [symbols]
args = [f"publicTrade.{symbol}" for symbol in symbols]
subscription = {"op": "subscribe", "args": args}
await self.websocket.send(json.dumps(subscription))
self.subscriptions.add(("trades", tuple(symbols)))
logger.info(f"Subscribed to trades: {symbols}")
async def _resubscribe(self):
for sub in self.subscriptions:
sub_type = sub[0]
if sub_type == "ticker":
await self.subscribe_ticker(list(sub[1]))
elif sub_type == "kline":
await self.subscribe_kline(list(sub[1]), list(sub[2]))
elif sub_type == "orderbook":
await self.subscribe_orderbook(list(sub[1]), sub[2])
elif sub_type == "trades":
await self.subscribe_trades(list(sub[1]))
async def _heartbeat(self):
while self.is_connected:
try:
ping_msg = {"op": "ping"}
await self.websocket.send(json.dumps(ping_msg))
await asyncio.sleep(self.heartbeat_interval)
except Exception as e:
logger.error(f"Heartbeat error: {e}")
break
async def _message_handler(self):
try:
async for message in self.websocket:
try:
data = json.loads(message)
if "topic" in data:
await self._process_message(data)
elif data.get("op") == "pong":
pass
elif "success" in data:
if data["success"]:
logger.info("Subscription successful")
else:
logger.error(f"Subscription failed: {data}")
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON received: {e}")
except Exception as e:
logger.error(f"Message processing error: {e}")
except websockets.exceptions.ConnectionClosed:
logger.warning("WebSocket connection closed")
self.is_connected = False
async def _private_message_handler(self):
try:
async for message in self.private_websocket:
try:
data = json.loads(message)
await self._process_private_message(data)
except Exception as e:
logger.error(f"Private message processing error: {e}")
except websockets.exceptions.ConnectionClosed:
logger.warning("Private WebSocket connection closed")
self.is_private_connected = False
async def _process_message(self, data):
topic = data["topic"]
payload = data["data"]
if topic.startswith("tickers."):
symbol = topic.split(".")[1]
self.tickers[symbol] = payload
self.latest_prices[symbol] = float(payload["lastPrice"])
elif topic.startswith("orderbook."):
symbol = topic.split(".")[2]
self.orderbooks[symbol] = payload
elif topic.startswith("publicTrade."):
symbol = topic.split(".")[1]
self.trades[symbol].extend(payload)
self.trades[symbol] = self.trades[symbol][-100:]
elif topic.startswith("kline."):
parts = topic.split(".")
interval = parts[1]
symbol = parts[2]
key = f"{symbol}_{interval}"
self.data_buffers[key].extend(payload)
self.data_buffers[key] = self.data_buffers[key][-200:]
if self.callback:
await self.callback(data)
async def _process_private_message(self, data):
if self.callback:
await self.callback(data)
async def _reconnect(self):
self.reconnect_delay = min(self.reconnect_delay * 2, self.max_reconnect_delay)
logger.info(f"Reconnecting in {self.reconnect_delay} seconds...")
await asyncio.sleep(self.reconnect_delay)
def get_latest_price(self, symbol):
return self.latest_prices.get(symbol)
def get_orderbook(self, symbol):
return self.orderbooks.get(symbol)
def get_recent_trades(self, symbol, limit=10):
return self.trades.get(symbol, [])[-limit:]
def get_kline_buffer(self, symbol, interval="1", limit=100):
key = f"{symbol}_{interval}"
return self.data_buffers.get(key, [])[-limit:]
async def close(self):
if self.websocket:
await self.websocket.close()
if self.private_websocket:
await self.private_websocket.close()
self.is_connected = False
self.is_private_connected = False
logger.info("WebSocket connections closed")