"""Read CompactTick structs from shared memory written by Rust ingestion.""" from __future__ import annotations import mmap import struct import time from dataclasses import dataclass from pathlib import Path from typing import List, Optional from prediction_engine.graph_store.graph_contract import graph_key_for_market_hash MAGIC = 0x4B414E_54494B HEADER_FMT = " str: return graph_key_for_market_hash( self.market_id_hash, venue=self.venue, side=self.side, ) class MmapReader: """Read market ticks from Rust-written shared memory.""" def __init__(self, path: str): self.path = Path(path) self._file = None self._mmap: Optional[mmap.mmap] = None self._last_count = 0 def open(self): self._file = open(self.path, "r+b") self._mmap = mmap.mmap(self._file.fileno(), 0) header = self._read_header() if header[0] != MAGIC: raise ValueError(f"Invalid magic: {header[0]:#x}, expected {MAGIC:#x}") def close(self): if self._mmap: self._mmap.close() if self._file: self._file.close() def __enter__(self): self.open() return self def __exit__(self, *args): self.close() def _read_header(self): self._mmap.seek(0) data = self._mmap.read(HEADER_SIZE) return struct.unpack(HEADER_FMT, data) def read_new_ticks(self) -> List[CompactTick]: """Read only ticks written since last call.""" _, capacity, write_count, _ = self._read_header() if write_count <= self._last_count: return [] n_new = min(int(write_count - self._last_count), int(capacity)) ticks = [] for i in range(n_new): idx = (int(write_count) - n_new + i) % int(capacity) offset = HEADER_SIZE + idx * TICK_SIZE self._mmap.seek(offset) data = self._mmap.read(TICK_SIZE) venue, side, price, size, ts, mhash = struct.unpack(TICK_FMT, data) ticks.append(CompactTick( venue=VENUE_MAP.get(venue, "unknown"), side=SIDE_MAP.get(side, "unknown"), price=price, size=size, timestamp_us=ts, market_id_hash=mhash, )) self._last_count = int(write_count) return ticks def poll(self, interval_ms: float = 10.0): """Generator that yields new ticks as they arrive.""" while True: ticks = self.read_new_ticks() if ticks: yield ticks else: time.sleep(interval_ms / 1000.0)