File size: 3,202 Bytes
201cf4d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
"""Read CompactTick structs from shared memory written by Rust ingestion."""
from __future__ import annotations

import mmap
import struct
import time
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional

from prediction_engine.graph_store.graph_contract import graph_key_for_market_hash

MAGIC = 0x4B414E_54494B
HEADER_FMT = "<QQQQ"  # magic, capacity, write_count, reserved
HEADER_SIZE = struct.calcsize(HEADER_FMT)

TICK_FMT = "<BB6xddqQ"  # venue, side, pad(6), price, size, timestamp_us, market_hash
TICK_SIZE = struct.calcsize(TICK_FMT)

VENUE_MAP = {0: "polymarket", 1: "kalshi", 2: "sportsbook"}
SIDE_MAP = {0: "yes", 1: "no"}


@dataclass
class CompactTick:
    venue: str
    side: str
    price: float
    size: float
    timestamp_us: int
    market_id_hash: int

    @property
    def semantic_key(self) -> str:
        return graph_key_for_market_hash(
            self.market_id_hash,
            venue=self.venue,
            side=self.side,
        )


class MmapReader:
    """Read market ticks from Rust-written shared memory."""

    def __init__(self, path: str):
        self.path = Path(path)
        self._file = None
        self._mmap: Optional[mmap.mmap] = None
        self._last_count = 0

    def open(self):
        self._file = open(self.path, "r+b")
        self._mmap = mmap.mmap(self._file.fileno(), 0)
        header = self._read_header()
        if header[0] != MAGIC:
            raise ValueError(f"Invalid magic: {header[0]:#x}, expected {MAGIC:#x}")

    def close(self):
        if self._mmap:
            self._mmap.close()
        if self._file:
            self._file.close()

    def __enter__(self):
        self.open()
        return self

    def __exit__(self, *args):
        self.close()

    def _read_header(self):
        self._mmap.seek(0)
        data = self._mmap.read(HEADER_SIZE)
        return struct.unpack(HEADER_FMT, data)

    def read_new_ticks(self) -> List[CompactTick]:
        """Read only ticks written since last call."""
        _, capacity, write_count, _ = self._read_header()

        if write_count <= self._last_count:
            return []

        n_new = min(int(write_count - self._last_count), int(capacity))
        ticks = []

        for i in range(n_new):
            idx = (int(write_count) - n_new + i) % int(capacity)
            offset = HEADER_SIZE + idx * TICK_SIZE
            self._mmap.seek(offset)
            data = self._mmap.read(TICK_SIZE)
            venue, side, price, size, ts, mhash = struct.unpack(TICK_FMT, data)
            ticks.append(CompactTick(
                venue=VENUE_MAP.get(venue, "unknown"),
                side=SIDE_MAP.get(side, "unknown"),
                price=price,
                size=size,
                timestamp_us=ts,
                market_id_hash=mhash,
            ))

        self._last_count = int(write_count)
        return ticks

    def poll(self, interval_ms: float = 10.0):
        """Generator that yields new ticks as they arrive."""
        while True:
            ticks = self.read_new_ticks()
            if ticks:
                yield ticks
            else:
                time.sleep(interval_ms / 1000.0)