Spaces:
Build error
Build error
| """Read CompactTick structs from shared memory written by Rust ingestion.""" | |
| from __future__ import annotations | |
| import mmap | |
| import struct | |
| import time | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import List, Optional | |
| from prediction_engine.graph_store.graph_contract import graph_key_for_market_hash | |
| MAGIC = 0x4B414E_54494B | |
| HEADER_FMT = "<QQQQ" # magic, capacity, write_count, reserved | |
| HEADER_SIZE = struct.calcsize(HEADER_FMT) | |
| TICK_FMT = "<BB6xddqQ" # venue, side, pad(6), price, size, timestamp_us, market_hash | |
| TICK_SIZE = struct.calcsize(TICK_FMT) | |
| VENUE_MAP = {0: "polymarket", 1: "kalshi", 2: "sportsbook"} | |
| SIDE_MAP = {0: "yes", 1: "no"} | |
| class CompactTick: | |
| venue: str | |
| side: str | |
| price: float | |
| size: float | |
| timestamp_us: int | |
| market_id_hash: int | |
| def semantic_key(self) -> str: | |
| return graph_key_for_market_hash( | |
| self.market_id_hash, | |
| venue=self.venue, | |
| side=self.side, | |
| ) | |
| class MmapReader: | |
| """Read market ticks from Rust-written shared memory.""" | |
| def __init__(self, path: str): | |
| self.path = Path(path) | |
| self._file = None | |
| self._mmap: Optional[mmap.mmap] = None | |
| self._last_count = 0 | |
| def open(self): | |
| self._file = open(self.path, "r+b") | |
| self._mmap = mmap.mmap(self._file.fileno(), 0) | |
| header = self._read_header() | |
| if header[0] != MAGIC: | |
| raise ValueError(f"Invalid magic: {header[0]:#x}, expected {MAGIC:#x}") | |
| def close(self): | |
| if self._mmap: | |
| self._mmap.close() | |
| if self._file: | |
| self._file.close() | |
| def __enter__(self): | |
| self.open() | |
| return self | |
| def __exit__(self, *args): | |
| self.close() | |
| def _read_header(self): | |
| self._mmap.seek(0) | |
| data = self._mmap.read(HEADER_SIZE) | |
| return struct.unpack(HEADER_FMT, data) | |
| def read_new_ticks(self) -> List[CompactTick]: | |
| """Read only ticks written since last call.""" | |
| _, capacity, write_count, _ = self._read_header() | |
| if write_count <= self._last_count: | |
| return [] | |
| n_new = min(int(write_count - self._last_count), int(capacity)) | |
| ticks = [] | |
| for i in range(n_new): | |
| idx = (int(write_count) - n_new + i) % int(capacity) | |
| offset = HEADER_SIZE + idx * TICK_SIZE | |
| self._mmap.seek(offset) | |
| data = self._mmap.read(TICK_SIZE) | |
| venue, side, price, size, ts, mhash = struct.unpack(TICK_FMT, data) | |
| ticks.append(CompactTick( | |
| venue=VENUE_MAP.get(venue, "unknown"), | |
| side=SIDE_MAP.get(side, "unknown"), | |
| price=price, | |
| size=size, | |
| timestamp_us=ts, | |
| market_id_hash=mhash, | |
| )) | |
| self._last_count = int(write_count) | |
| return ticks | |
| def poll(self, interval_ms: float = 10.0): | |
| """Generator that yields new ticks as they arrive.""" | |
| while True: | |
| ticks = self.read_new_ticks() | |
| if ticks: | |
| yield ticks | |
| else: | |
| time.sleep(interval_ms / 1000.0) | |