JohnGenetica's picture
Deploy ANE KAN runtime Space
201cf4d verified
"""Main orchestrator: read mmap ticks, score with KAN, detect signals."""
from __future__ import annotations
import argparse
import logging
import time
from typing import Dict, List
from prediction_engine.python_bridge.kan_scorer import KANFastScorer, MarketFeatureExtractor
from prediction_engine.python_bridge.mmap_reader import CompactTick, MmapReader
logger = logging.getLogger(__name__)
ARBITRAGE_THRESHOLD = 0.98
class SignalDetector:
"""Detect trading signals from market ticks."""
def __init__(self):
self.extractor = MarketFeatureExtractor()
self.scorer = KANFastScorer(in_features=6)
self.market_state: Dict[int, Dict[str, CompactTick]] = {}
def update(self, tick: CompactTick):
key = tick.market_id_hash
if key not in self.market_state:
self.market_state[key] = {}
venue_key = f"{tick.venue}_{tick.side}"
self.market_state[key][venue_key] = tick
def check_arbitrage(self, market_hash: int) -> dict | None:
state = self.market_state.get(market_hash, {})
yes_prices = {k: v.price for k, v in state.items() if "yes" in k}
no_prices = {k: v.price for k, v in state.items() if "no" in k}
for yes_venue, yes_price in yes_prices.items():
for no_venue, no_price in no_prices.items():
if yes_venue.split("_")[0] != no_venue.split("_")[0]:
total = yes_price + no_price
if total < ARBITRAGE_THRESHOLD:
return {
"type": "cross_venue_arbitrage",
"yes_venue": yes_venue,
"no_venue": no_venue,
"yes_price": yes_price,
"no_price": no_price,
"total_cost": total,
"guaranteed_profit": 1.0 - total,
}
return None
def process_ticks(self, ticks: List[CompactTick]) -> List[dict]:
signals = []
for tick in ticks:
self.update(tick)
arb = self.check_arbitrage(tick.market_id_hash)
if arb:
features = self.extractor.extract({
"yes_price": arb["yes_price"],
"no_price": arb["no_price"],
"spread": arb["guaranteed_profit"],
"volume_ratio": 1.0,
"time_to_event_hours": 24.0,
"venue_count": 2,
})
score = self.scorer.score(features)
arb["kan_score"] = score
signals.append(arb)
logger.info(
"ARBITRAGE: %s vs %s, profit=$%.4f, confidence=%.3f, edge=%s",
arb["yes_venue"], arb["no_venue"],
arb["guaranteed_profit"], score["confidence"],
score["edge_shape"],
)
return signals
def main():
parser = argparse.ArgumentParser(description="Prediction Engine Orchestrator")
parser.add_argument("--mmap-path", default="/tmp/prediction_ticks.mmap")
parser.add_argument("--poll-ms", type=float, default=10.0)
parser.add_argument("--test", action="store_true")
args = parser.parse_args()
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
if args.test:
logger.info("Smoke test mode")
detector = SignalDetector()
logger.info("SignalDetector initialized with KAN scorer")
logger.info("Orchestrator smoke test PASSED")
return
logger.info("Starting orchestrator, reading from %s", args.mmap_path)
detector = SignalDetector()
with MmapReader(args.mmap_path) as reader:
for ticks in reader.poll(interval_ms=args.poll_ms):
signals = detector.process_ticks(ticks)
for sig in signals:
logger.info("SIGNAL: %s", sig)
if __name__ == "__main__":
main()