arbintel / src /ingestion.py
AJAY KASU
Add root app.py for Streamlit GUI and dependencies
77fd2f6
import asyncio
import logging
import asyncpg
from datetime import datetime, timezone
import os
from src.config import (
POLYMARKET_REST_API, POLYMARKET_WS_API,
KALSHI_REST_API, KALSHI_WS_API,
KALSHI_API_KEY, KALSHI_PRIVATE_KEY,
DATABASE_URL
)
from src.clients.polymarket import PolymarketClient
from src.clients.kalshi import KalshiClient
logger = logging.getLogger(__name__)
class DataIngestionPipeline:
def __init__(self):
self.db_pool = None
self.polymarket = PolymarketClient(
rest_url=POLYMARKET_REST_API if 'POLYMARKET_REST_API' in globals() else "https://gamma-api.polymarket.com",
ws_url=POLYMARKET_WS_API
)
self.kalshi = KalshiClient(
rest_url=KALSHI_REST_API,
ws_url=KALSHI_WS_API,
api_key=KALSHI_API_KEY,
private_key=KALSHI_PRIVATE_KEY
)
async def setup_db(self):
"""Setup TimescaleDB async connection pool."""
try:
self.db_pool = await asyncpg.create_pool(DATABASE_URL)
logger.info("Connected to TimescaleDB.")
except Exception as e:
logger.error(f"Failed to connect to database: {e}")
self.db_pool = None
async def persist_tick(self, timestamp: datetime, market_id: str, platform: str, event_name: str, outcome: str,
bid_price: float, bid_size: float, ask_price: float, ask_size: float, mid_price: float, volume_24h: float):
"""Insert market tick into 'tick_data' table."""
if not self.db_pool:
return
query = """
INSERT INTO tick_data (
timestamp, market_id, platform, event_name, outcome,
bid_price, bid_size, ask_price, ask_size, mid_price, volume_24h
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (timestamp, market_id, platform, outcome) DO NOTHING;
"""
async with self.db_pool.acquire() as conn:
await conn.execute(query, timestamp, market_id, platform, event_name, outcome,
bid_price, bid_size, ask_price, ask_size, mid_price, volume_24h)
async def handle_message(self, platform: str, message: dict):
"""Handle incoming WebSocket data from Polymarket or Kalshi."""
# This is a placeholder for where we parse the specific message structure
# Poly: message contains 'price', 'size', 'side'
# Kalshi: message contains 'type' == 'orderbook_delta'
logger.info(f"[{platform.upper()}] Message received: {str(message)[:100]}...")
# Example dummy persistence (you would parse the real exact fields)
now = datetime.now(timezone.utc)
try:
if platform == "polymarket" and "data" in message:
# Mock parsing
for item in message.get("data", []):
await self.persist_tick(
timestamp=now, market_id=item.get("asset_id", "unknown"),
platform=platform, event_name="Polymarket Market",
outcome="YES",
bid_price=float(item.get("price", 0)), bid_size=float(item.get("size", 0)),
ask_price=0.0, ask_size=0.0, mid_price=float(item.get("price", 0)), volume_24h=0.0
)
elif platform == "kalshi" and message.get("type") == "orderbook_delta":
# Mock parsing
await self.persist_tick(
timestamp=now, market_id=message.get("market_ticker", "unknown"),
platform=platform, event_name="Kalshi Market",
outcome="YES",
bid_price=0.0, bid_size=0.0,
ask_price=0.0, ask_size=0.0, mid_price=0.0, volume_24h=0.0
)
except Exception as e:
logger.error(f"Error persisting {platform} tick: {e}")
async def run(self):
await self.setup_db()
self.polymarket.set_callback(self.handle_message)
self.kalshi.set_callback(self.handle_message)
await self.polymarket.connect()
# In a real run, you require the api key for Kalshi
if KALSHI_API_KEY:
await self.kalshi.connect()
else:
logger.warning("KALSHI_API_KEY not found. Skipping Kalshi WS connection.")
# Example subscription to 3 active markets
poly_markets = ["0x217..."] # e.g., US Election Polymarket Token ID
kalshi_markets = ["KXUS2024"] # e.g., Presidential Kalshi
await self.polymarket.subscribe(poly_markets)
if KALSHI_API_KEY:
await self.kalshi.subscribe(kalshi_markets)
# Keep running
while True:
await asyncio.sleep(60)
if __name__ == "__main__":
from src.config import logging
# Run the ingestion script
pipeline = DataIngestionPipeline()
try:
asyncio.run(pipeline.run())
except KeyboardInterrupt:
logger.info("Shutting down ingestion pipeline.")