import asyncio import logging import asyncpg from datetime import datetime, timezone import os from src.config import ( POLYMARKET_REST_API, POLYMARKET_WS_API, KALSHI_REST_API, KALSHI_WS_API, KALSHI_API_KEY, KALSHI_PRIVATE_KEY, DATABASE_URL ) from src.clients.polymarket import PolymarketClient from src.clients.kalshi import KalshiClient logger = logging.getLogger(__name__) class DataIngestionPipeline: def __init__(self): self.db_pool = None self.polymarket = PolymarketClient( rest_url=POLYMARKET_REST_API if 'POLYMARKET_REST_API' in globals() else "https://gamma-api.polymarket.com", ws_url=POLYMARKET_WS_API ) self.kalshi = KalshiClient( rest_url=KALSHI_REST_API, ws_url=KALSHI_WS_API, api_key=KALSHI_API_KEY, private_key=KALSHI_PRIVATE_KEY ) async def setup_db(self): """Setup TimescaleDB async connection pool.""" try: self.db_pool = await asyncpg.create_pool(DATABASE_URL) logger.info("Connected to TimescaleDB.") except Exception as e: logger.error(f"Failed to connect to database: {e}") self.db_pool = None async def persist_tick(self, timestamp: datetime, market_id: str, platform: str, event_name: str, outcome: str, bid_price: float, bid_size: float, ask_price: float, ask_size: float, mid_price: float, volume_24h: float): """Insert market tick into 'tick_data' table.""" if not self.db_pool: return query = """ INSERT INTO tick_data ( timestamp, market_id, platform, event_name, outcome, bid_price, bid_size, ask_price, ask_size, mid_price, volume_24h ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT (timestamp, market_id, platform, outcome) DO NOTHING; """ async with self.db_pool.acquire() as conn: await conn.execute(query, timestamp, market_id, platform, event_name, outcome, bid_price, bid_size, ask_price, ask_size, mid_price, volume_24h) async def handle_message(self, platform: str, message: dict): """Handle incoming WebSocket data from Polymarket or Kalshi.""" # This is a placeholder for where we parse the specific message structure # Poly: message contains 'price', 'size', 'side' # Kalshi: message contains 'type' == 'orderbook_delta' logger.info(f"[{platform.upper()}] Message received: {str(message)[:100]}...") # Example dummy persistence (you would parse the real exact fields) now = datetime.now(timezone.utc) try: if platform == "polymarket" and "data" in message: # Mock parsing for item in message.get("data", []): await self.persist_tick( timestamp=now, market_id=item.get("asset_id", "unknown"), platform=platform, event_name="Polymarket Market", outcome="YES", bid_price=float(item.get("price", 0)), bid_size=float(item.get("size", 0)), ask_price=0.0, ask_size=0.0, mid_price=float(item.get("price", 0)), volume_24h=0.0 ) elif platform == "kalshi" and message.get("type") == "orderbook_delta": # Mock parsing await self.persist_tick( timestamp=now, market_id=message.get("market_ticker", "unknown"), platform=platform, event_name="Kalshi Market", outcome="YES", bid_price=0.0, bid_size=0.0, ask_price=0.0, ask_size=0.0, mid_price=0.0, volume_24h=0.0 ) except Exception as e: logger.error(f"Error persisting {platform} tick: {e}") async def run(self): await self.setup_db() self.polymarket.set_callback(self.handle_message) self.kalshi.set_callback(self.handle_message) await self.polymarket.connect() # In a real run, you require the api key for Kalshi if KALSHI_API_KEY: await self.kalshi.connect() else: logger.warning("KALSHI_API_KEY not found. Skipping Kalshi WS connection.") # Example subscription to 3 active markets poly_markets = ["0x217..."] # e.g., US Election Polymarket Token ID kalshi_markets = ["KXUS2024"] # e.g., Presidential Kalshi await self.polymarket.subscribe(poly_markets) if KALSHI_API_KEY: await self.kalshi.subscribe(kalshi_markets) # Keep running while True: await asyncio.sleep(60) if __name__ == "__main__": from src.config import logging # Run the ingestion script pipeline = DataIngestionPipeline() try: asyncio.run(pipeline.run()) except KeyboardInterrupt: logger.info("Shutting down ingestion pipeline.")