| import asyncio |
| import logging |
| import asyncpg |
| from datetime import datetime, timezone |
| import os |
|
|
| from src.config import ( |
| POLYMARKET_REST_API, POLYMARKET_WS_API, |
| KALSHI_REST_API, KALSHI_WS_API, |
| KALSHI_API_KEY, KALSHI_PRIVATE_KEY, |
| DATABASE_URL |
| ) |
| from src.clients.polymarket import PolymarketClient |
| from src.clients.kalshi import KalshiClient |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class DataIngestionPipeline: |
| def __init__(self): |
| self.db_pool = None |
| self.polymarket = PolymarketClient( |
| rest_url=POLYMARKET_REST_API if 'POLYMARKET_REST_API' in globals() else "https://gamma-api.polymarket.com", |
| ws_url=POLYMARKET_WS_API |
| ) |
| self.kalshi = KalshiClient( |
| rest_url=KALSHI_REST_API, |
| ws_url=KALSHI_WS_API, |
| api_key=KALSHI_API_KEY, |
| private_key=KALSHI_PRIVATE_KEY |
| ) |
|
|
| async def setup_db(self): |
| """Setup TimescaleDB async connection pool.""" |
| try: |
| self.db_pool = await asyncpg.create_pool(DATABASE_URL) |
| logger.info("Connected to TimescaleDB.") |
| except Exception as e: |
| logger.error(f"Failed to connect to database: {e}") |
| self.db_pool = None |
|
|
| async def persist_tick(self, timestamp: datetime, market_id: str, platform: str, event_name: str, outcome: str, |
| bid_price: float, bid_size: float, ask_price: float, ask_size: float, mid_price: float, volume_24h: float): |
| """Insert market tick into 'tick_data' table.""" |
| if not self.db_pool: |
| return |
|
|
| query = """ |
| INSERT INTO tick_data ( |
| timestamp, market_id, platform, event_name, outcome, |
| bid_price, bid_size, ask_price, ask_size, mid_price, volume_24h |
| ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) |
| ON CONFLICT (timestamp, market_id, platform, outcome) DO NOTHING; |
| """ |
| async with self.db_pool.acquire() as conn: |
| await conn.execute(query, timestamp, market_id, platform, event_name, outcome, |
| bid_price, bid_size, ask_price, ask_size, mid_price, volume_24h) |
|
|
| async def handle_message(self, platform: str, message: dict): |
| """Handle incoming WebSocket data from Polymarket or Kalshi.""" |
| |
| |
| |
| logger.info(f"[{platform.upper()}] Message received: {str(message)[:100]}...") |
| |
| |
| now = datetime.now(timezone.utc) |
| |
| try: |
| if platform == "polymarket" and "data" in message: |
| |
| for item in message.get("data", []): |
| await self.persist_tick( |
| timestamp=now, market_id=item.get("asset_id", "unknown"), |
| platform=platform, event_name="Polymarket Market", |
| outcome="YES", |
| bid_price=float(item.get("price", 0)), bid_size=float(item.get("size", 0)), |
| ask_price=0.0, ask_size=0.0, mid_price=float(item.get("price", 0)), volume_24h=0.0 |
| ) |
| elif platform == "kalshi" and message.get("type") == "orderbook_delta": |
| |
| await self.persist_tick( |
| timestamp=now, market_id=message.get("market_ticker", "unknown"), |
| platform=platform, event_name="Kalshi Market", |
| outcome="YES", |
| bid_price=0.0, bid_size=0.0, |
| ask_price=0.0, ask_size=0.0, mid_price=0.0, volume_24h=0.0 |
| ) |
| except Exception as e: |
| logger.error(f"Error persisting {platform} tick: {e}") |
|
|
| async def run(self): |
| await self.setup_db() |
|
|
| self.polymarket.set_callback(self.handle_message) |
| self.kalshi.set_callback(self.handle_message) |
|
|
| await self.polymarket.connect() |
| |
| if KALSHI_API_KEY: |
| await self.kalshi.connect() |
| else: |
| logger.warning("KALSHI_API_KEY not found. Skipping Kalshi WS connection.") |
|
|
| |
| poly_markets = ["0x217..."] |
| kalshi_markets = ["KXUS2024"] |
| |
| await self.polymarket.subscribe(poly_markets) |
| if KALSHI_API_KEY: |
| await self.kalshi.subscribe(kalshi_markets) |
|
|
| |
| while True: |
| await asyncio.sleep(60) |
|
|
| if __name__ == "__main__": |
| from src.config import logging |
| |
| pipeline = DataIngestionPipeline() |
| try: |
| asyncio.run(pipeline.run()) |
| except KeyboardInterrupt: |
| logger.info("Shutting down ingestion pipeline.") |
|
|