Marketmind / engine /order_book.py
ARKAISW's picture
Initial MarketMind commit (Corrected Author)
10150dc
"""
Continuous Double Auction (CDA) Matching Engine.
Implements a limit order book with price-time priority matching.
This is the core market mechanism — all agent orders flow through here.
"""
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
class Side(Enum):
BUY = "buy"
SELL = "sell"
@dataclass
class Order:
"""A single limit order submitted by an agent."""
agent_id: str
side: Side
price: float
quantity: int
timestamp: int # tick number — used for time priority
def __post_init__(self):
if self.quantity <= 0:
raise ValueError(f"Order quantity must be positive, got {self.quantity}")
if self.price <= 0:
raise ValueError(f"Order price must be positive, got {self.price}")
@dataclass
class Trade:
"""A completed trade between two orders."""
tick: int
price: float
quantity: int
buyer_id: str
seller_id: str
aggressor_side: Side # who crossed the spread
@property
def value(self) -> float:
return self.price * self.quantity
class OrderBook:
"""
Limit order book with price-time priority matching.
Bids sorted descending (best bid = highest price first).
Asks sorted ascending (best ask = lowest price first).
Within same price level, earlier orders match first (FIFO).
"""
def __init__(self):
# Lists of Order, kept sorted after each insertion
self.bids: list[Order] = [] # sorted: highest price first, then earliest timestamp
self.asks: list[Order] = [] # sorted: lowest price first, then earliest timestamp
self.trade_log: list[Trade] = []
self._tick: int = 0
@property
def best_bid(self) -> Optional[float]:
"""Highest bid price, or None if no bids."""
return self.bids[0].price if self.bids else None
@property
def best_ask(self) -> Optional[float]:
"""Lowest ask price, or None if no asks."""
return self.asks[0].price if self.asks else None
@property
def mid_price(self) -> Optional[float]:
"""Midpoint between best bid and best ask, or None if either side is empty."""
if self.best_bid is not None and self.best_ask is not None:
return (self.best_bid + self.best_ask) / 2.0
return None
@property
def spread(self) -> Optional[float]:
"""Bid-ask spread, or None if either side is empty."""
if self.best_bid is not None and self.best_ask is not None:
return self.best_ask - self.best_bid
return None
def set_tick(self, tick: int):
"""Advance the internal tick clock. Called by the simulation loop."""
self._tick = tick
def submit_order(self, order: Order) -> list[Trade]:
"""
Submit an order to the book. Attempts to match immediately.
Any unmatched residual rests in the book.
Returns list of trades executed by this order (possibly empty).
"""
trades: list[Trade] = []
if order.side == Side.BUY:
trades = self._match_buy(order)
elif order.side == Side.SELL:
trades = self._match_sell(order)
self.trade_log.extend(trades)
return trades
def _match_buy(self, buy_order: Order) -> list[Trade]:
"""Match an incoming buy order against resting asks."""
trades: list[Trade] = []
remaining_qty = buy_order.quantity
while remaining_qty > 0 and self.asks:
best_ask_order = self.asks[0]
# Buy can only match if its price >= best ask price
if buy_order.price < best_ask_order.price:
break
# Determine fill quantity
fill_qty = min(remaining_qty, best_ask_order.quantity)
fill_price = best_ask_order.price # price-time priority: passive order's price
trade = Trade(
tick=self._tick,
price=fill_price,
quantity=fill_qty,
buyer_id=buy_order.agent_id,
seller_id=best_ask_order.agent_id,
aggressor_side=Side.BUY,
)
trades.append(trade)
remaining_qty -= fill_qty
best_ask_order.quantity -= fill_qty
# Remove fully filled ask
if best_ask_order.quantity == 0:
self.asks.pop(0)
# Rest any unfilled portion in the bid book
if remaining_qty > 0:
resting_order = Order(
agent_id=buy_order.agent_id,
side=Side.BUY,
price=buy_order.price,
quantity=remaining_qty,
timestamp=buy_order.timestamp,
)
self._insert_bid(resting_order)
return trades
def _match_sell(self, sell_order: Order) -> list[Trade]:
"""Match an incoming sell order against resting bids."""
trades: list[Trade] = []
remaining_qty = sell_order.quantity
while remaining_qty > 0 and self.bids:
best_bid_order = self.bids[0]
# Sell can only match if its price <= best bid price
if sell_order.price > best_bid_order.price:
break
# Determine fill quantity
fill_qty = min(remaining_qty, best_bid_order.quantity)
fill_price = best_bid_order.price # passive order's price
trade = Trade(
tick=self._tick,
price=fill_price,
quantity=fill_qty,
buyer_id=best_bid_order.agent_id,
seller_id=sell_order.agent_id,
aggressor_side=Side.SELL,
)
trades.append(trade)
remaining_qty -= fill_qty
best_bid_order.quantity -= fill_qty
# Remove fully filled bid
if best_bid_order.quantity == 0:
self.bids.pop(0)
# Rest any unfilled portion in the ask book
if remaining_qty > 0:
resting_order = Order(
agent_id=sell_order.agent_id,
side=Side.SELL,
price=sell_order.price,
quantity=remaining_qty,
timestamp=sell_order.timestamp,
)
self._insert_ask(resting_order)
return trades
def _insert_bid(self, order: Order):
"""Insert a bid order maintaining descending price, ascending timestamp order."""
import bisect
# For bids: we want descending price, ascending timestamp.
# bisect uses < operator, so we use a key that negates price but keeps timestamp positive.
bisect.insort(self.bids, order, key=lambda x: (-x.price, x.timestamp))
def _insert_ask(self, order: Order):
"""Insert an ask order maintaining ascending price, ascending timestamp order."""
import bisect
# For asks: we want ascending price, ascending timestamp.
bisect.insort(self.asks, order, key=lambda x: (x.price, x.timestamp))
def cancel_agent_orders(self, agent_id: str):
"""Remove all resting orders for a given agent. Used between ticks."""
self.bids = [o for o in self.bids if o.agent_id != agent_id]
self.asks = [o for o in self.asks if o.agent_id != agent_id]
def clear_book(self):
"""Remove all resting orders. Used for book reset between experiments."""
self.bids.clear()
self.asks.clear()
def snapshot(self) -> dict:
"""
Return a snapshot of the current order book state.
Used by market_state serializer to build the LLM prompt.
"""
return {
"best_bid": self.best_bid,
"best_ask": self.best_ask,
"mid_price": self.mid_price,
"spread": self.spread,
"bid_depth": sum(o.quantity for o in self.bids),
"ask_depth": sum(o.quantity for o in self.asks),
"bid_levels": len(self.bids),
"ask_levels": len(self.asks),
"last_trade_price": self.trade_log[-1].price if self.trade_log else None,
"last_trade_qty": self.trade_log[-1].quantity if self.trade_log else None,
"total_trades": len(self.trade_log),
}
def __repr__(self) -> str:
bb = f"{self.best_bid:.2f}" if self.best_bid else "---"
ba = f"{self.best_ask:.2f}" if self.best_ask else "---"
sp = f"{self.spread:.4f}" if self.spread else "---"
return f"OrderBook(bid={bb}, ask={ba}, spread={sp}, bids={len(self.bids)}, asks={len(self.asks)})"