File size: 8,601 Bytes
10150dc | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 | """
Continuous Double Auction (CDA) Matching Engine.
Implements a limit order book with price-time priority matching.
This is the core market mechanism — all agent orders flow through here.
"""
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
class Side(Enum):
BUY = "buy"
SELL = "sell"
@dataclass
class Order:
"""A single limit order submitted by an agent."""
agent_id: str
side: Side
price: float
quantity: int
timestamp: int # tick number — used for time priority
def __post_init__(self):
if self.quantity <= 0:
raise ValueError(f"Order quantity must be positive, got {self.quantity}")
if self.price <= 0:
raise ValueError(f"Order price must be positive, got {self.price}")
@dataclass
class Trade:
"""A completed trade between two orders."""
tick: int
price: float
quantity: int
buyer_id: str
seller_id: str
aggressor_side: Side # who crossed the spread
@property
def value(self) -> float:
return self.price * self.quantity
class OrderBook:
"""
Limit order book with price-time priority matching.
Bids sorted descending (best bid = highest price first).
Asks sorted ascending (best ask = lowest price first).
Within same price level, earlier orders match first (FIFO).
"""
def __init__(self):
# Lists of Order, kept sorted after each insertion
self.bids: list[Order] = [] # sorted: highest price first, then earliest timestamp
self.asks: list[Order] = [] # sorted: lowest price first, then earliest timestamp
self.trade_log: list[Trade] = []
self._tick: int = 0
@property
def best_bid(self) -> Optional[float]:
"""Highest bid price, or None if no bids."""
return self.bids[0].price if self.bids else None
@property
def best_ask(self) -> Optional[float]:
"""Lowest ask price, or None if no asks."""
return self.asks[0].price if self.asks else None
@property
def mid_price(self) -> Optional[float]:
"""Midpoint between best bid and best ask, or None if either side is empty."""
if self.best_bid is not None and self.best_ask is not None:
return (self.best_bid + self.best_ask) / 2.0
return None
@property
def spread(self) -> Optional[float]:
"""Bid-ask spread, or None if either side is empty."""
if self.best_bid is not None and self.best_ask is not None:
return self.best_ask - self.best_bid
return None
def set_tick(self, tick: int):
"""Advance the internal tick clock. Called by the simulation loop."""
self._tick = tick
def submit_order(self, order: Order) -> list[Trade]:
"""
Submit an order to the book. Attempts to match immediately.
Any unmatched residual rests in the book.
Returns list of trades executed by this order (possibly empty).
"""
trades: list[Trade] = []
if order.side == Side.BUY:
trades = self._match_buy(order)
elif order.side == Side.SELL:
trades = self._match_sell(order)
self.trade_log.extend(trades)
return trades
def _match_buy(self, buy_order: Order) -> list[Trade]:
"""Match an incoming buy order against resting asks."""
trades: list[Trade] = []
remaining_qty = buy_order.quantity
while remaining_qty > 0 and self.asks:
best_ask_order = self.asks[0]
# Buy can only match if its price >= best ask price
if buy_order.price < best_ask_order.price:
break
# Determine fill quantity
fill_qty = min(remaining_qty, best_ask_order.quantity)
fill_price = best_ask_order.price # price-time priority: passive order's price
trade = Trade(
tick=self._tick,
price=fill_price,
quantity=fill_qty,
buyer_id=buy_order.agent_id,
seller_id=best_ask_order.agent_id,
aggressor_side=Side.BUY,
)
trades.append(trade)
remaining_qty -= fill_qty
best_ask_order.quantity -= fill_qty
# Remove fully filled ask
if best_ask_order.quantity == 0:
self.asks.pop(0)
# Rest any unfilled portion in the bid book
if remaining_qty > 0:
resting_order = Order(
agent_id=buy_order.agent_id,
side=Side.BUY,
price=buy_order.price,
quantity=remaining_qty,
timestamp=buy_order.timestamp,
)
self._insert_bid(resting_order)
return trades
def _match_sell(self, sell_order: Order) -> list[Trade]:
"""Match an incoming sell order against resting bids."""
trades: list[Trade] = []
remaining_qty = sell_order.quantity
while remaining_qty > 0 and self.bids:
best_bid_order = self.bids[0]
# Sell can only match if its price <= best bid price
if sell_order.price > best_bid_order.price:
break
# Determine fill quantity
fill_qty = min(remaining_qty, best_bid_order.quantity)
fill_price = best_bid_order.price # passive order's price
trade = Trade(
tick=self._tick,
price=fill_price,
quantity=fill_qty,
buyer_id=best_bid_order.agent_id,
seller_id=sell_order.agent_id,
aggressor_side=Side.SELL,
)
trades.append(trade)
remaining_qty -= fill_qty
best_bid_order.quantity -= fill_qty
# Remove fully filled bid
if best_bid_order.quantity == 0:
self.bids.pop(0)
# Rest any unfilled portion in the ask book
if remaining_qty > 0:
resting_order = Order(
agent_id=sell_order.agent_id,
side=Side.SELL,
price=sell_order.price,
quantity=remaining_qty,
timestamp=sell_order.timestamp,
)
self._insert_ask(resting_order)
return trades
def _insert_bid(self, order: Order):
"""Insert a bid order maintaining descending price, ascending timestamp order."""
import bisect
# For bids: we want descending price, ascending timestamp.
# bisect uses < operator, so we use a key that negates price but keeps timestamp positive.
bisect.insort(self.bids, order, key=lambda x: (-x.price, x.timestamp))
def _insert_ask(self, order: Order):
"""Insert an ask order maintaining ascending price, ascending timestamp order."""
import bisect
# For asks: we want ascending price, ascending timestamp.
bisect.insort(self.asks, order, key=lambda x: (x.price, x.timestamp))
def cancel_agent_orders(self, agent_id: str):
"""Remove all resting orders for a given agent. Used between ticks."""
self.bids = [o for o in self.bids if o.agent_id != agent_id]
self.asks = [o for o in self.asks if o.agent_id != agent_id]
def clear_book(self):
"""Remove all resting orders. Used for book reset between experiments."""
self.bids.clear()
self.asks.clear()
def snapshot(self) -> dict:
"""
Return a snapshot of the current order book state.
Used by market_state serializer to build the LLM prompt.
"""
return {
"best_bid": self.best_bid,
"best_ask": self.best_ask,
"mid_price": self.mid_price,
"spread": self.spread,
"bid_depth": sum(o.quantity for o in self.bids),
"ask_depth": sum(o.quantity for o in self.asks),
"bid_levels": len(self.bids),
"ask_levels": len(self.asks),
"last_trade_price": self.trade_log[-1].price if self.trade_log else None,
"last_trade_qty": self.trade_log[-1].quantity if self.trade_log else None,
"total_trades": len(self.trade_log),
}
def __repr__(self) -> str:
bb = f"{self.best_bid:.2f}" if self.best_bid else "---"
ba = f"{self.best_ask:.2f}" if self.best_ask else "---"
sp = f"{self.spread:.4f}" if self.spread else "---"
return f"OrderBook(bid={bb}, ask={ba}, spread={sp}, bids={len(self.bids)}, asks={len(self.asks)})"
|