"""Limit Order Book (LOB) Reconstruction and Level 2 Features What Jane Street sees that retail doesn't: - Full Level 2 order book (10+ price levels, not just best bid/ask) - Queue position for each order - Order arrival/cancel rates - Market depth profile - Spread dynamics (widening = informed trading) - Large order detection This is the foundation of HIGH-FREQUENCY alpha. """ import numpy as np import pandas as pd from typing import Dict, List, Tuple, Optional, NamedTuple from collections import defaultdict import bisect import warnings warnings.filterwarnings('ignore') class OrderBookEntry: """Single entry in the order book""" def __init__(self, price: float, quantity: int, order_id: str, side: str, timestamp: float): self.price = price self.quantity = quantity self.order_id = order_id self.side = side # 'bid' or 'ask' self.timestamp = timestamp def __repr__(self): return f"{self.side.upper()} {self.quantity}@{self.price:.2f}" class LimitOrderBook: """ Full Limit Order Book reconstruction from message feed. Jane Street processes millions of these per second. Key insight: The order book itself CONTAINS alpha. - Large orders at round numbers = resistance/support - Order imbalance predicts next price move (30ms ahead) - Spread dynamics = informed vs uninformed flow """ def __init__(self, max_depth: int = 10): self.max_depth = max_depth self.bids = {} # price -> list of OrderBookEntry self.asks = {} # price -> list of OrderBookEntry self.bid_prices = [] # Sorted descending self.ask_prices = [] # Sorted ascending self.order_map = {} # order_id -> (side, price) # Statistics self.trade_history = [] self.imbalance_history = [] self.spread_history = [] self.depth_history = [] def add_order(self, order: OrderBookEntry): """Add a limit order""" side_dict = self.bids if order.side == 'bid' else self.asks price_list = self.bid_prices if order.side == 'bid' else self.ask_prices if order.price not in side_dict: side_dict[order.price] = [] bisect.insort(price_list, order.price) if order.side == 'bid': price_list.sort(reverse=True) side_dict[order.price].append(order) self.order_map[order.order_id] = (order.side, order.price) def cancel_order(self, order_id: str): """Cancel a limit order""" if order_id not in self.order_map: return False side, price = self.order_map[order_id] side_dict = self.bids if side == 'bid' else self.asks if price in side_dict: side_dict[price] = [o for o in side_dict[price] if o.order_id != order_id] if not side_dict[price]: del side_dict[price] price_list = self.bid_prices if side == 'bid' else self.ask_prices price_list.remove(price) del self.order_map[order_id] return True def execute_trade(self, side: str, quantity: int, aggressive: bool = True) -> Tuple[float, int]: """ Execute a market order against the book. aggressive=True: market order (crosses spread) aggressive=False: limit order that hits Returns: (avg_price, executed_qty) """ remaining = quantity total_cost = 0.0 # Match against opposite side opposite = 'ask' if side == 'bid' else 'bid' opposite_dict = self.asks if opposite == 'ask' else self.bids price_list = self.ask_prices if opposite == 'ask' else self.bid_prices while remaining > 0 and price_list: best_price = price_list[0] if best_price not in opposite_dict: price_list.pop(0) continue level_orders = opposite_dict[best_price] while remaining > 0 and level_orders: order = level_orders[0] exec_qty = min(remaining, order.quantity) total_cost += exec_qty * best_price remaining -= exec_qty order.quantity -= exec_qty if order.quantity <= 0: level_orders.pop(0) if order.order_id in self.order_map: del self.order_map[order.order_id] if not level_orders: del opposite_dict[best_price] price_list.pop(0) executed = quantity - remaining avg_price = total_cost / executed if executed > 0 else 0.0 # Record trade if executed > 0: self.trade_history.append({ 'side': side, 'quantity': executed, 'avg_price': avg_price, 'aggressive': aggressive }) return avg_price, executed def get_best_bid(self) -> Optional[float]: return self.bid_prices[0] if self.bid_prices else None def get_best_ask(self) -> Optional[float]: return self.ask_prices[0] if self.ask_prices else None def get_mid_price(self) -> Optional[float]: bb = self.get_best_bid() ba = self.get_best_ask() if bb is not None and ba is not None: return (bb + ba) / 2 return None def get_spread(self) -> Optional[float]: bb = self.get_best_bid() ba = self.get_best_ask() if bb is not None and ba is not None: return ba - bb return None def get_spread_bps(self) -> Optional[float]: spread = self.get_spread() mid = self.get_mid_price() if spread is not None and mid is not None: return (spread / mid) * 10000 return None def get_book_snapshot(self, depth: Optional[int] = None) -> Dict: """Get a snapshot of the full book""" depth = depth or self.max_depth bids_snapshot = [] for p in self.bid_prices[:depth]: if p in self.bids: total_qty = sum(o.quantity for o in self.bids[p]) num_orders = len(self.bids[p]) bids_snapshot.append({ 'price': p, 'quantity': total_qty, 'num_orders': num_orders, 'side': 'bid' }) asks_snapshot = [] for p in self.ask_prices[:depth]: if p in self.asks: total_qty = sum(o.quantity for o in self.asks[p]) num_orders = len(self.asks[p]) asks_snapshot.append({ 'price': p, 'quantity': total_qty, 'num_orders': num_orders, 'side': 'ask' }) return { 'bids': bids_snapshot, 'asks': asks_snapshot, 'mid_price': self.get_mid_price(), 'spread': self.get_spread(), 'spread_bps': self.get_spread_bps(), 'bid_depth': len(self.bid_prices), 'ask_depth': len(self.ask_prices), 'total_bid_quantity': sum(sum(o.quantity for o in self.bids[p]) for p in self.bid_prices), 'total_ask_quantity': sum(sum(o.quantity for o in self.asks[p]) for p in self.ask_prices) } def get_order_imbalance(self, levels: int = 5) -> float: """ Order imbalance at top N levels. Positive = more buying interest (bullish short-term) Negative = more selling interest (bearish short-term) Jane Street's #1 short-term signal. """ bid_qty = sum( sum(o.quantity for o in self.bids[p]) for p in self.bid_prices[:levels] if p in self.bids ) ask_qty = sum( sum(o.quantity for o in self.asks[p]) for p in self.ask_prices[:levels] if p in self.asks ) total = bid_qty + ask_qty if total == 0: return 0.0 return (bid_qty - ask_qty) / total class LOBFeatures: """ Extract institutional-grade features from reconstructed LOB. These features predict price movements 1-100ms ahead. This is the EDGE that makes Jane Street profitable. """ @staticmethod def price_levels(book: LimitOrderBook, n: int = 10) -> pd.DataFrame: """Price level data (Level 2 equivalent)""" snapshot = book.get_book_snapshot(depth=n) rows = [] # Bids (from best to worst) for i, level in enumerate(snapshot['bids']): rows.append({ 'side': 'bid', 'level': i + 1, 'price': level['price'], 'quantity': level['quantity'], 'num_orders': level['num_orders'] }) # Asks for i, level in enumerate(snapshot['asks']): rows.append({ 'side': 'ask', 'level': i + 1, 'price': level['price'], 'quantity': level['quantity'], 'num_orders': level['num_orders'] }) return pd.DataFrame(rows) @staticmethod def depth_profile(book: LimitOrderBook) -> Dict: """ Market depth profile across price levels. Skewed depth (more on one side) predicts price direction. """ snapshot = book.get_book_snapshot() bids = snapshot['bids'] asks = snapshot['asks'] # Cumulative depth cum_bid_qty = np.cumsum([b['quantity'] for b in bids]) cum_ask_qty = np.cumsum([a['quantity'] for a in asks]) # Price distance from mid mid = snapshot['mid_price'] or 0 bid_distances = [mid - b['price'] for b in bids] ask_distances = [a['price'] - mid for a in asks] return { 'bid_depth_1': cum_bid_qty[0] if len(cum_bid_qty) > 0 else 0, 'bid_depth_5': cum_bid_qty[4] if len(cum_bid_qty) > 4 else cum_bid_qty[-1] if len(cum_bid_qty) > 0 else 0, 'bid_depth_10': cum_bid_qty[9] if len(cum_bid_qty) > 9 else cum_bid_qty[-1] if len(cum_bid_qty) > 0 else 0, 'ask_depth_1': cum_ask_qty[0] if len(cum_ask_qty) > 0 else 0, 'ask_depth_5': cum_ask_qty[4] if len(cum_ask_qty) > 4 else cum_ask_qty[-1] if len(cum_ask_qty) > 0 else 0, 'ask_depth_10': cum_ask_qty[9] if len(cum_ask_qty) > 9 else cum_ask_qty[-1] if len(cum_ask_qty) > 0 else 0, 'depth_ratio_1': (cum_bid_qty[0] / cum_ask_qty[0]) if len(cum_bid_qty) > 0 and len(cum_ask_qty) > 0 and cum_ask_qty[0] > 0 else 1.0, 'depth_ratio_5': (cum_bid_qty[4] / cum_ask_qty[4]) if len(cum_bid_qty) > 4 and len(cum_ask_qty) > 4 and cum_ask_qty[4] > 0 else 1.0, 'depth_skew': (snapshot['total_bid_quantity'] - snapshot['total_ask_quantity']) / (snapshot['total_bid_quantity'] + snapshot['total_ask_quantity'] + 1) } @staticmethod def queue_features(book: LimitOrderBook) -> Dict: """ Queue position features. Being at the FRONT of the queue means you get filled first = better price. Queue length = how long you wait. """ snapshot = book.get_book_snapshot(depth=1) best_bid = snapshot['bids'][0] if snapshot['bids'] else None best_ask = snapshot['asks'][0] if snapshot['asks'] else None return { 'bid_queue_length': best_bid['num_orders'] if best_bid else 0, 'ask_queue_length': best_ask['num_orders'] if best_ask else 0, 'bid_queue_qty': best_bid['quantity'] if best_bid else 0, 'ask_queue_qty': best_ask['quantity'] if best_ask else 0, 'queue_imbalance': ((best_bid['num_orders'] if best_bid else 0) - (best_ask['num_orders'] if best_ask else 0)) } @staticmethod def large_order_detection(book: LimitOrderBook, threshold_qty: float = 1000, threshold_pct: float = 0.3) -> List[Dict]: """ Detect unusually large orders. Large orders = informed traders or iceberg orders. Can predict price movements. """ snapshot = book.get_book_snapshot() large_orders = [] for side, side_name in [(book.bids, 'bid'), (book.asks, 'ask')]: for price, orders in side.items(): total_at_price = sum(o.quantity for o in orders) avg_qty = np.mean([o.quantity for o in orders]) if orders else 0 for order in orders: if order.quantity >= threshold_qty: large_orders.append({ 'side': side_name, 'price': price, 'quantity': order.quantity, 'pct_of_level': order.quantity / total_at_price if total_at_price > 0 else 0, 'is_iceberg': order.quantity > avg_qty * 3 # Likely iceberg }) return sorted(large_orders, key=lambda x: x['quantity'], reverse=True) @staticmethod def spread_dynamics(book_history: List[LimitOrderBook], window: int = 10) -> Dict: """ Spread dynamics over time. Widening spread = uncertainty, less liquidity, informed trading. Narrowing spread = confidence, more liquidity. """ spreads = [] mids = [] imbalances = [] for book in book_history[-window:]: s = book.get_spread_bps() m = book.get_mid_price() i = book.get_order_imbalance() if s is not None: spreads.append(s) if m is not None: mids.append(m) imbalances.append(i) if len(spreads) < 2: return {} return { 'avg_spread_bps': np.mean(spreads), 'spread_volatility': np.std(spreads), 'spread_trend': spreads[-1] - spreads[0], 'spread_percentile': sum(1 for s in spreads if s <= spreads[-1]) / len(spreads), 'mid_price_change_pct': (mids[-1] / mids[0] - 1) * 100 if len(mids) >= 2 and mids[0] > 0 else 0, 'avg_imbalance': np.mean(imbalances), 'imbalance_trend': imbalances[-1] - imbalances[0] } @staticmethod def order_flow_-toxicity(book: LimitOrderBook, trade_history: List[Dict], window: int = 50) -> Dict: """ VPIN-like metric using LOB data. Toxic flow = aggressive orders that consume liquidity. High toxicity = informed trading = adverse selection. """ if not trade_history: return {'vpin_approx': 0.0, 'toxicity': 0.0} recent_trades = trade_history[-window:] # Classify trades as aggressive buyer or seller # (Simplified: if trade near ask = buyer aggressive) mid = book.get_mid_price() buy_volume = sum(t['quantity'] for t in recent_trades if t.get('side') == 'bid' or t.get('aggressive', False)) sell_volume = sum(t['quantity'] for t in recent_trades if t.get('side') == 'ask' or not t.get('aggressive', False)) total = buy_volume + sell_volume if total == 0: return {'vpin_approx': 0.0, 'toxicity': 0.0} # Toxicity = |buy_vol - sell_vol| / total vpin = abs(buy_volume - sell_volume) / total return { 'vpin_approx': vpin, 'toxicity': vpin, 'buy_volume': buy_volume, 'sell_volume': sell_volume, 'total_volume': total } @staticmethod def all_features(book: LimitOrderBook, book_history: Optional[List[LimitOrderBook]] = None) -> Dict: """Compute all LOB features at once""" features = {} # Basic features snapshot = book.get_book_snapshot() features['mid_price'] = snapshot['mid_price'] features['spread'] = snapshot['spread'] features['spread_bps'] = snapshot['spread_bps'] features['bid_depth_total'] = snapshot['total_bid_quantity'] features['ask_depth_total'] = snapshot['total_ask_quantity'] features['depth_imbalance'] = book.get_order_imbalance() # Depth profile depth = LOBFeatures.depth_profile(book) features.update({f'depth_{k}': v for k, v in depth.items()}) # Queue features queue = LOBFeatures.queue_features(book) features.update({f'queue_{k}': v for k, v in queue.items()}) # Large orders large = LOBFeatures.large_order_detection(book) features['n_large_orders'] = len(large) features['large_order_total_qty'] = sum(o['quantity'] for o in large) # Spread dynamics if book_history and len(book_history) >= 2: dynamics = LOBFeatures.spread_dynamics(book_history) features.update({f'spread_dyn_{k}': v for k, v in dynamics.items()}) return features def generate_synthetic_lob_feed(n_messages: int = 1000, base_price: float = 100.0, tick_size: float = 0.01) -> List[Dict]: """Generate synthetic LOB message feed for testing""" np.random.seed(42) messages = [] order_counter = 0 # Initialize with some orders for _ in range(50): side = 'bid' if np.random.rand() < 0.5 else 'ask' price = base_price + np.random.randint(-50, 50) * tick_size if side == 'ask': price = max(price, base_price) else: price = min(price, base_price) messages.append({ 'type': 'add', 'order_id': f'order_{order_counter}', 'side': side, 'price': round(price, 2), 'quantity': np.random.randint(100, 1000), 'timestamp': len(messages) / 1000.0 }) order_counter += 1 # Generate flowing messages for _ in range(n_messages - 50): msg_type = np.random.choice(['add', 'cancel', 'trade'], p=[0.5, 0.3, 0.2]) if msg_type == 'add': side = 'bid' if np.random.rand() < 0.5 else 'ask' offset = np.random.exponential(10) * tick_size price = base_price + (offset if side == 'ask' else -offset) price = round(max(price, 0.01), 2) messages.append({ 'type': 'add', 'order_id': f'order_{order_counter}', 'side': side, 'price': price, 'quantity': np.random.randint(100, 2000), 'timestamp': len(messages) / 1000.0 }) order_counter += 1 elif msg_type == 'cancel' and order_counter > 0: # Cancel a random existing order messages.append({ 'type': 'cancel', 'order_id': f'order_{np.random.randint(0, order_counter)}', 'timestamp': len(messages) / 1000.0 }) else: # Trade side = 'bid' if np.random.rand() < 0.5 else 'ask' messages.append({ 'type': 'trade', 'side': side, 'quantity': np.random.randint(100, 500), 'timestamp': len(messages) / 1000.0 }) return messages def process_message_feed(messages: List[Dict]) -> Tuple[LimitOrderBook, List[Dict]]: """Process a message feed and reconstruct the LOB""" book = LimitOrderBook(max_depth=20) trades = [] book_history = [] features_history = [] for msg in messages: if msg['type'] == 'add': entry = OrderBookEntry( price=msg['price'], quantity=msg['quantity'], order_id=msg['order_id'], side=msg['side'], timestamp=msg['timestamp'] ) book.add_order(entry) elif msg['type'] == 'cancel': book.cancel_order(msg['order_id']) elif msg['type'] == 'trade': side = 'bid' if msg['side'] == 'ask' else 'ask' # Opposite side avg_price, qty = book.execute_trade(side, msg['quantity'], aggressive=True) trades.append({ 'timestamp': msg['timestamp'], 'side': msg['side'], 'quantity': qty, 'avg_price': avg_price }) # Snapshot every 100 messages if len(book_history) % 100 == 0: book_history.append(book) features = LOBFeatures.all_features(book, book_history) features['timestamp'] = msg['timestamp'] features_history.append(features) return book, trades, features_history if __name__ == '__main__': print("=" * 70) print(" LIMIT ORDER BOOK RECONSTRUCTION") print("=" * 70) # Generate synthetic data messages = generate_synthetic_lob_feed(n_messages=5000) # Process book, trades, features = process_message_feed(messages) # Final snapshot snapshot = book.get_book_snapshot(depth=5) print(f"\nFinal LOB State:") print(f" Mid Price: ${snapshot['mid_price']:.2f}") print(f" Spread: {snapshot['spread_bps']:.1f} bps") print(f" Bid Depth: {snapshot['bid_depth']} levels") print(f" Ask Depth: {snapshot['ask_depth']} levels") print(f" Total Bid Qty: {snapshot['total_bid_quantity']:,}") print(f" Total Ask Qty: {snapshot['total_ask_quantity']:,}") print(f" Order Imbalance: {book.get_order_imbalance():.3f}") # Level 2 print(f"\nLevel 2 Book (top 5):") levels = LOBFeatures.price_levels(book, n=5) print(levels.to_string()) # Features if features: print(f"\nLatest LOB Features:") latest = features[-1] for k, v in latest.items(): if isinstance(v, (int, float)): print(f" {k}: {v:.4f}") print(f"\n Trades executed: {len(trades)}") print(f" Total messages processed: {len(messages)}") print(f"\n This is what Jane Street sees every microsecond.") print(f" Order imbalance, queue position, depth profile = PURE ALPHA.")