| import asyncio |
| import json |
| import logging |
| import aiohttp |
| import websockets |
| from typing import Callable, Optional |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class PolymarketClient: |
| def __init__(self, rest_url: str, ws_url: str): |
| self.rest_url = rest_url |
| self.ws_url = ws_url |
| self.session: Optional[aiohttp.ClientSession] = None |
| self.ws: Optional[websockets.WebSocketClientProtocol] = None |
| self.on_message_callback: Optional[Callable] = None |
|
|
| async def connect(self): |
| """Establish HTTP session and WebSocket connection.""" |
| self.session = aiohttp.ClientSession() |
| await self._connect_ws() |
|
|
| async def _connect_ws(self): |
| """Connect to WebSocket with exponential backoff for reconnections.""" |
| backoff = 1 |
| while True: |
| try: |
| logger.info(f"Connecting to Polymarket WS: {self.ws_url}") |
| self.ws = await websockets.connect(self.ws_url) |
| logger.info("Polymarket WS connected.") |
| asyncio.create_task(self._listen()) |
| break |
| except Exception as e: |
| logger.error(f"WebSocket connection failed: {e}. Retrying in {backoff}s...") |
| await asyncio.sleep(backoff) |
| backoff = min(backoff * 2, 60) |
|
|
| def set_callback(self, callback: Callable): |
| self.on_message_callback = callback |
|
|
| async def subscribe(self, market_ids: list[str]): |
| """Subscribe to specific market CLOB updates.""" |
| if not self.ws: |
| logger.error("WebSocket not connected. Call connect() first.") |
| return |
|
|
| payload = { |
| "assets_ids": market_ids, |
| "type": "market" |
| } |
| await self.ws.send(json.dumps(payload)) |
| logger.info(f"Subscribed to Polymarket markets: {market_ids}") |
|
|
| async def _listen(self): |
| """Listen to WS messages and handle them.""" |
| try: |
| async for message in self.ws: |
| data = json.loads(message) |
| if self.on_message_callback: |
| await self.on_message_callback("polymarket", data) |
| except websockets.exceptions.ConnectionClosed: |
| logger.warning("Polymarket WS connection closed. Reconnecting...") |
| await self._connect_ws() |
| except Exception as e: |
| logger.error(f"Polymarket WS listen error: {e}") |
| await self._connect_ws() |
|
|
| async def get_market(self, market_id: str): |
| """Fetch REST market details to build mapping.""" |
| async with self.session.get(f"{self.rest_url}/markets/{market_id}") as response: |
| if response.status == 200: |
| return await response.json() |
| else: |
| logger.error(f"Failed to fetch Polymarket market {market_id}: {response.status}") |
| return None |
|
|
| def normalize_price(self, price: float) -> float: |
| """Polymarket prices are basically probabilities inherently or scale of 0-1 dollars.""" |
| return price |
|
|
| async def close(self): |
| if self.ws: |
| await self.ws.close() |
| if self.session: |
| await self.session.close() |
|
|