JohnGenetica's picture
Deploy ANE KAN runtime Space
201cf4d verified
"""Read CompactTick structs from shared memory written by Rust ingestion."""
from __future__ import annotations
import mmap
import struct
import time
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional
from prediction_engine.graph_store.graph_contract import graph_key_for_market_hash
MAGIC = 0x4B414E_54494B
HEADER_FMT = "<QQQQ" # magic, capacity, write_count, reserved
HEADER_SIZE = struct.calcsize(HEADER_FMT)
TICK_FMT = "<BB6xddqQ" # venue, side, pad(6), price, size, timestamp_us, market_hash
TICK_SIZE = struct.calcsize(TICK_FMT)
VENUE_MAP = {0: "polymarket", 1: "kalshi", 2: "sportsbook"}
SIDE_MAP = {0: "yes", 1: "no"}
@dataclass
class CompactTick:
venue: str
side: str
price: float
size: float
timestamp_us: int
market_id_hash: int
@property
def semantic_key(self) -> str:
return graph_key_for_market_hash(
self.market_id_hash,
venue=self.venue,
side=self.side,
)
class MmapReader:
"""Read market ticks from Rust-written shared memory."""
def __init__(self, path: str):
self.path = Path(path)
self._file = None
self._mmap: Optional[mmap.mmap] = None
self._last_count = 0
def open(self):
self._file = open(self.path, "r+b")
self._mmap = mmap.mmap(self._file.fileno(), 0)
header = self._read_header()
if header[0] != MAGIC:
raise ValueError(f"Invalid magic: {header[0]:#x}, expected {MAGIC:#x}")
def close(self):
if self._mmap:
self._mmap.close()
if self._file:
self._file.close()
def __enter__(self):
self.open()
return self
def __exit__(self, *args):
self.close()
def _read_header(self):
self._mmap.seek(0)
data = self._mmap.read(HEADER_SIZE)
return struct.unpack(HEADER_FMT, data)
def read_new_ticks(self) -> List[CompactTick]:
"""Read only ticks written since last call."""
_, capacity, write_count, _ = self._read_header()
if write_count <= self._last_count:
return []
n_new = min(int(write_count - self._last_count), int(capacity))
ticks = []
for i in range(n_new):
idx = (int(write_count) - n_new + i) % int(capacity)
offset = HEADER_SIZE + idx * TICK_SIZE
self._mmap.seek(offset)
data = self._mmap.read(TICK_SIZE)
venue, side, price, size, ts, mhash = struct.unpack(TICK_FMT, data)
ticks.append(CompactTick(
venue=VENUE_MAP.get(venue, "unknown"),
side=SIDE_MAP.get(side, "unknown"),
price=price,
size=size,
timestamp_us=ts,
market_id_hash=mhash,
))
self._last_count = int(write_count)
return ticks
def poll(self, interval_ms: float = 10.0):
"""Generator that yields new ticks as they arrive."""
while True:
ticks = self.read_new_ticks()
if ticks:
yield ticks
else:
time.sleep(interval_ms / 1000.0)