Spaces:
Build error
Build error
File size: 3,202 Bytes
201cf4d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 | """Read CompactTick structs from shared memory written by Rust ingestion."""
from __future__ import annotations
import mmap
import struct
import time
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional
from prediction_engine.graph_store.graph_contract import graph_key_for_market_hash
MAGIC = 0x4B414E_54494B
HEADER_FMT = "<QQQQ" # magic, capacity, write_count, reserved
HEADER_SIZE = struct.calcsize(HEADER_FMT)
TICK_FMT = "<BB6xddqQ" # venue, side, pad(6), price, size, timestamp_us, market_hash
TICK_SIZE = struct.calcsize(TICK_FMT)
VENUE_MAP = {0: "polymarket", 1: "kalshi", 2: "sportsbook"}
SIDE_MAP = {0: "yes", 1: "no"}
@dataclass
class CompactTick:
venue: str
side: str
price: float
size: float
timestamp_us: int
market_id_hash: int
@property
def semantic_key(self) -> str:
return graph_key_for_market_hash(
self.market_id_hash,
venue=self.venue,
side=self.side,
)
class MmapReader:
"""Read market ticks from Rust-written shared memory."""
def __init__(self, path: str):
self.path = Path(path)
self._file = None
self._mmap: Optional[mmap.mmap] = None
self._last_count = 0
def open(self):
self._file = open(self.path, "r+b")
self._mmap = mmap.mmap(self._file.fileno(), 0)
header = self._read_header()
if header[0] != MAGIC:
raise ValueError(f"Invalid magic: {header[0]:#x}, expected {MAGIC:#x}")
def close(self):
if self._mmap:
self._mmap.close()
if self._file:
self._file.close()
def __enter__(self):
self.open()
return self
def __exit__(self, *args):
self.close()
def _read_header(self):
self._mmap.seek(0)
data = self._mmap.read(HEADER_SIZE)
return struct.unpack(HEADER_FMT, data)
def read_new_ticks(self) -> List[CompactTick]:
"""Read only ticks written since last call."""
_, capacity, write_count, _ = self._read_header()
if write_count <= self._last_count:
return []
n_new = min(int(write_count - self._last_count), int(capacity))
ticks = []
for i in range(n_new):
idx = (int(write_count) - n_new + i) % int(capacity)
offset = HEADER_SIZE + idx * TICK_SIZE
self._mmap.seek(offset)
data = self._mmap.read(TICK_SIZE)
venue, side, price, size, ts, mhash = struct.unpack(TICK_FMT, data)
ticks.append(CompactTick(
venue=VENUE_MAP.get(venue, "unknown"),
side=SIDE_MAP.get(side, "unknown"),
price=price,
size=size,
timestamp_us=ts,
market_id_hash=mhash,
))
self._last_count = int(write_count)
return ticks
def poll(self, interval_ms: float = 10.0):
"""Generator that yields new ticks as they arrive."""
while True:
ticks = self.read_new_ticks()
if ticks:
yield ticks
else:
time.sleep(interval_ms / 1000.0)
|