arbintel / src /clients /polymarket.py
AJAY KASU
Add root app.py for Streamlit GUI and dependencies
77fd2f6
import asyncio
import json
import logging
import aiohttp
import websockets
from typing import Callable, Optional
logger = logging.getLogger(__name__)
class PolymarketClient:
def __init__(self, rest_url: str, ws_url: str):
self.rest_url = rest_url
self.ws_url = ws_url
self.session: Optional[aiohttp.ClientSession] = None
self.ws: Optional[websockets.WebSocketClientProtocol] = None
self.on_message_callback: Optional[Callable] = None
async def connect(self):
"""Establish HTTP session and WebSocket connection."""
self.session = aiohttp.ClientSession()
await self._connect_ws()
async def _connect_ws(self):
"""Connect to WebSocket with exponential backoff for reconnections."""
backoff = 1
while True:
try:
logger.info(f"Connecting to Polymarket WS: {self.ws_url}")
self.ws = await websockets.connect(self.ws_url)
logger.info("Polymarket WS connected.")
asyncio.create_task(self._listen())
break
except Exception as e:
logger.error(f"WebSocket connection failed: {e}. Retrying in {backoff}s...")
await asyncio.sleep(backoff)
backoff = min(backoff * 2, 60)
def set_callback(self, callback: Callable):
self.on_message_callback = callback
async def subscribe(self, market_ids: list[str]):
"""Subscribe to specific market CLOB updates."""
if not self.ws:
logger.error("WebSocket not connected. Call connect() first.")
return
payload = {
"assets_ids": market_ids,
"type": "market"
}
await self.ws.send(json.dumps(payload))
logger.info(f"Subscribed to Polymarket markets: {market_ids}")
async def _listen(self):
"""Listen to WS messages and handle them."""
try:
async for message in self.ws:
data = json.loads(message)
if self.on_message_callback:
await self.on_message_callback("polymarket", data)
except websockets.exceptions.ConnectionClosed:
logger.warning("Polymarket WS connection closed. Reconnecting...")
await self._connect_ws()
except Exception as e:
logger.error(f"Polymarket WS listen error: {e}")
await self._connect_ws()
async def get_market(self, market_id: str):
"""Fetch REST market details to build mapping."""
async with self.session.get(f"{self.rest_url}/markets/{market_id}") as response:
if response.status == 200:
return await response.json()
else:
logger.error(f"Failed to fetch Polymarket market {market_id}: {response.status}")
return None
def normalize_price(self, price: float) -> float:
"""Polymarket prices are basically probabilities inherently or scale of 0-1 dollars."""
return price
async def close(self):
if self.ws:
await self.ws.close()
if self.session:
await self.session.close()