Add Level 2 LOB reconstruction with full order book, queue position, depth profile, spread dynamics
2b738f4 verified | """Limit Order Book (LOB) Reconstruction and Level 2 Features | |
| What Jane Street sees that retail doesn't: | |
| - Full Level 2 order book (10+ price levels, not just best bid/ask) | |
| - Queue position for each order | |
| - Order arrival/cancel rates | |
| - Market depth profile | |
| - Spread dynamics (widening = informed trading) | |
| - Large order detection | |
| This is the foundation of HIGH-FREQUENCY alpha. | |
| """ | |
| import numpy as np | |
| import pandas as pd | |
| from typing import Dict, List, Tuple, Optional, NamedTuple | |
| from collections import defaultdict | |
| import bisect | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| class OrderBookEntry: | |
| """Single entry in the order book""" | |
| def __init__(self, price: float, quantity: int, order_id: str, | |
| side: str, timestamp: float): | |
| self.price = price | |
| self.quantity = quantity | |
| self.order_id = order_id | |
| self.side = side # 'bid' or 'ask' | |
| self.timestamp = timestamp | |
| def __repr__(self): | |
| return f"{self.side.upper()} {self.quantity}@{self.price:.2f}" | |
| class LimitOrderBook: | |
| """ | |
| Full Limit Order Book reconstruction from message feed. | |
| Jane Street processes millions of these per second. | |
| Key insight: The order book itself CONTAINS alpha. | |
| - Large orders at round numbers = resistance/support | |
| - Order imbalance predicts next price move (30ms ahead) | |
| - Spread dynamics = informed vs uninformed flow | |
| """ | |
| def __init__(self, max_depth: int = 10): | |
| self.max_depth = max_depth | |
| self.bids = {} # price -> list of OrderBookEntry | |
| self.asks = {} # price -> list of OrderBookEntry | |
| self.bid_prices = [] # Sorted descending | |
| self.ask_prices = [] # Sorted ascending | |
| self.order_map = {} # order_id -> (side, price) | |
| # Statistics | |
| self.trade_history = [] | |
| self.imbalance_history = [] | |
| self.spread_history = [] | |
| self.depth_history = [] | |
| def add_order(self, order: OrderBookEntry): | |
| """Add a limit order""" | |
| side_dict = self.bids if order.side == 'bid' else self.asks | |
| price_list = self.bid_prices if order.side == 'bid' else self.ask_prices | |
| if order.price not in side_dict: | |
| side_dict[order.price] = [] | |
| bisect.insort(price_list, order.price) | |
| if order.side == 'bid': | |
| price_list.sort(reverse=True) | |
| side_dict[order.price].append(order) | |
| self.order_map[order.order_id] = (order.side, order.price) | |
| def cancel_order(self, order_id: str): | |
| """Cancel a limit order""" | |
| if order_id not in self.order_map: | |
| return False | |
| side, price = self.order_map[order_id] | |
| side_dict = self.bids if side == 'bid' else self.asks | |
| if price in side_dict: | |
| side_dict[price] = [o for o in side_dict[price] if o.order_id != order_id] | |
| if not side_dict[price]: | |
| del side_dict[price] | |
| price_list = self.bid_prices if side == 'bid' else self.ask_prices | |
| price_list.remove(price) | |
| del self.order_map[order_id] | |
| return True | |
| def execute_trade(self, side: str, quantity: int, | |
| aggressive: bool = True) -> Tuple[float, int]: | |
| """ | |
| Execute a market order against the book. | |
| aggressive=True: market order (crosses spread) | |
| aggressive=False: limit order that hits | |
| Returns: (avg_price, executed_qty) | |
| """ | |
| remaining = quantity | |
| total_cost = 0.0 | |
| # Match against opposite side | |
| opposite = 'ask' if side == 'bid' else 'bid' | |
| opposite_dict = self.asks if opposite == 'ask' else self.bids | |
| price_list = self.ask_prices if opposite == 'ask' else self.bid_prices | |
| while remaining > 0 and price_list: | |
| best_price = price_list[0] | |
| if best_price not in opposite_dict: | |
| price_list.pop(0) | |
| continue | |
| level_orders = opposite_dict[best_price] | |
| while remaining > 0 and level_orders: | |
| order = level_orders[0] | |
| exec_qty = min(remaining, order.quantity) | |
| total_cost += exec_qty * best_price | |
| remaining -= exec_qty | |
| order.quantity -= exec_qty | |
| if order.quantity <= 0: | |
| level_orders.pop(0) | |
| if order.order_id in self.order_map: | |
| del self.order_map[order.order_id] | |
| if not level_orders: | |
| del opposite_dict[best_price] | |
| price_list.pop(0) | |
| executed = quantity - remaining | |
| avg_price = total_cost / executed if executed > 0 else 0.0 | |
| # Record trade | |
| if executed > 0: | |
| self.trade_history.append({ | |
| 'side': side, | |
| 'quantity': executed, | |
| 'avg_price': avg_price, | |
| 'aggressive': aggressive | |
| }) | |
| return avg_price, executed | |
| def get_best_bid(self) -> Optional[float]: | |
| return self.bid_prices[0] if self.bid_prices else None | |
| def get_best_ask(self) -> Optional[float]: | |
| return self.ask_prices[0] if self.ask_prices else None | |
| def get_mid_price(self) -> Optional[float]: | |
| bb = self.get_best_bid() | |
| ba = self.get_best_ask() | |
| if bb is not None and ba is not None: | |
| return (bb + ba) / 2 | |
| return None | |
| def get_spread(self) -> Optional[float]: | |
| bb = self.get_best_bid() | |
| ba = self.get_best_ask() | |
| if bb is not None and ba is not None: | |
| return ba - bb | |
| return None | |
| def get_spread_bps(self) -> Optional[float]: | |
| spread = self.get_spread() | |
| mid = self.get_mid_price() | |
| if spread is not None and mid is not None: | |
| return (spread / mid) * 10000 | |
| return None | |
| def get_book_snapshot(self, depth: Optional[int] = None) -> Dict: | |
| """Get a snapshot of the full book""" | |
| depth = depth or self.max_depth | |
| bids_snapshot = [] | |
| for p in self.bid_prices[:depth]: | |
| if p in self.bids: | |
| total_qty = sum(o.quantity for o in self.bids[p]) | |
| num_orders = len(self.bids[p]) | |
| bids_snapshot.append({ | |
| 'price': p, | |
| 'quantity': total_qty, | |
| 'num_orders': num_orders, | |
| 'side': 'bid' | |
| }) | |
| asks_snapshot = [] | |
| for p in self.ask_prices[:depth]: | |
| if p in self.asks: | |
| total_qty = sum(o.quantity for o in self.asks[p]) | |
| num_orders = len(self.asks[p]) | |
| asks_snapshot.append({ | |
| 'price': p, | |
| 'quantity': total_qty, | |
| 'num_orders': num_orders, | |
| 'side': 'ask' | |
| }) | |
| return { | |
| 'bids': bids_snapshot, | |
| 'asks': asks_snapshot, | |
| 'mid_price': self.get_mid_price(), | |
| 'spread': self.get_spread(), | |
| 'spread_bps': self.get_spread_bps(), | |
| 'bid_depth': len(self.bid_prices), | |
| 'ask_depth': len(self.ask_prices), | |
| 'total_bid_quantity': sum(sum(o.quantity for o in self.bids[p]) | |
| for p in self.bid_prices), | |
| 'total_ask_quantity': sum(sum(o.quantity for o in self.asks[p]) | |
| for p in self.ask_prices) | |
| } | |
| def get_order_imbalance(self, levels: int = 5) -> float: | |
| """ | |
| Order imbalance at top N levels. | |
| Positive = more buying interest (bullish short-term) | |
| Negative = more selling interest (bearish short-term) | |
| Jane Street's #1 short-term signal. | |
| """ | |
| bid_qty = sum( | |
| sum(o.quantity for o in self.bids[p]) | |
| for p in self.bid_prices[:levels] if p in self.bids | |
| ) | |
| ask_qty = sum( | |
| sum(o.quantity for o in self.asks[p]) | |
| for p in self.ask_prices[:levels] if p in self.asks | |
| ) | |
| total = bid_qty + ask_qty | |
| if total == 0: | |
| return 0.0 | |
| return (bid_qty - ask_qty) / total | |
| class LOBFeatures: | |
| """ | |
| Extract institutional-grade features from reconstructed LOB. | |
| These features predict price movements 1-100ms ahead. | |
| This is the EDGE that makes Jane Street profitable. | |
| """ | |
| def price_levels(book: LimitOrderBook, n: int = 10) -> pd.DataFrame: | |
| """Price level data (Level 2 equivalent)""" | |
| snapshot = book.get_book_snapshot(depth=n) | |
| rows = [] | |
| # Bids (from best to worst) | |
| for i, level in enumerate(snapshot['bids']): | |
| rows.append({ | |
| 'side': 'bid', | |
| 'level': i + 1, | |
| 'price': level['price'], | |
| 'quantity': level['quantity'], | |
| 'num_orders': level['num_orders'] | |
| }) | |
| # Asks | |
| for i, level in enumerate(snapshot['asks']): | |
| rows.append({ | |
| 'side': 'ask', | |
| 'level': i + 1, | |
| 'price': level['price'], | |
| 'quantity': level['quantity'], | |
| 'num_orders': level['num_orders'] | |
| }) | |
| return pd.DataFrame(rows) | |
| def depth_profile(book: LimitOrderBook) -> Dict: | |
| """ | |
| Market depth profile across price levels. | |
| Skewed depth (more on one side) predicts price direction. | |
| """ | |
| snapshot = book.get_book_snapshot() | |
| bids = snapshot['bids'] | |
| asks = snapshot['asks'] | |
| # Cumulative depth | |
| cum_bid_qty = np.cumsum([b['quantity'] for b in bids]) | |
| cum_ask_qty = np.cumsum([a['quantity'] for a in asks]) | |
| # Price distance from mid | |
| mid = snapshot['mid_price'] or 0 | |
| bid_distances = [mid - b['price'] for b in bids] | |
| ask_distances = [a['price'] - mid for a in asks] | |
| return { | |
| 'bid_depth_1': cum_bid_qty[0] if len(cum_bid_qty) > 0 else 0, | |
| 'bid_depth_5': cum_bid_qty[4] if len(cum_bid_qty) > 4 else cum_bid_qty[-1] if len(cum_bid_qty) > 0 else 0, | |
| 'bid_depth_10': cum_bid_qty[9] if len(cum_bid_qty) > 9 else cum_bid_qty[-1] if len(cum_bid_qty) > 0 else 0, | |
| 'ask_depth_1': cum_ask_qty[0] if len(cum_ask_qty) > 0 else 0, | |
| 'ask_depth_5': cum_ask_qty[4] if len(cum_ask_qty) > 4 else cum_ask_qty[-1] if len(cum_ask_qty) > 0 else 0, | |
| 'ask_depth_10': cum_ask_qty[9] if len(cum_ask_qty) > 9 else cum_ask_qty[-1] if len(cum_ask_qty) > 0 else 0, | |
| 'depth_ratio_1': (cum_bid_qty[0] / cum_ask_qty[0]) if len(cum_bid_qty) > 0 and len(cum_ask_qty) > 0 and cum_ask_qty[0] > 0 else 1.0, | |
| 'depth_ratio_5': (cum_bid_qty[4] / cum_ask_qty[4]) if len(cum_bid_qty) > 4 and len(cum_ask_qty) > 4 and cum_ask_qty[4] > 0 else 1.0, | |
| 'depth_skew': (snapshot['total_bid_quantity'] - snapshot['total_ask_quantity']) / | |
| (snapshot['total_bid_quantity'] + snapshot['total_ask_quantity'] + 1) | |
| } | |
| def queue_features(book: LimitOrderBook) -> Dict: | |
| """ | |
| Queue position features. | |
| Being at the FRONT of the queue means you get filled first = better price. | |
| Queue length = how long you wait. | |
| """ | |
| snapshot = book.get_book_snapshot(depth=1) | |
| best_bid = snapshot['bids'][0] if snapshot['bids'] else None | |
| best_ask = snapshot['asks'][0] if snapshot['asks'] else None | |
| return { | |
| 'bid_queue_length': best_bid['num_orders'] if best_bid else 0, | |
| 'ask_queue_length': best_ask['num_orders'] if best_ask else 0, | |
| 'bid_queue_qty': best_bid['quantity'] if best_bid else 0, | |
| 'ask_queue_qty': best_ask['quantity'] if best_ask else 0, | |
| 'queue_imbalance': ((best_bid['num_orders'] if best_bid else 0) - | |
| (best_ask['num_orders'] if best_ask else 0)) | |
| } | |
| def large_order_detection(book: LimitOrderBook, | |
| threshold_qty: float = 1000, | |
| threshold_pct: float = 0.3) -> List[Dict]: | |
| """ | |
| Detect unusually large orders. | |
| Large orders = informed traders or iceberg orders. | |
| Can predict price movements. | |
| """ | |
| snapshot = book.get_book_snapshot() | |
| large_orders = [] | |
| for side, side_name in [(book.bids, 'bid'), (book.asks, 'ask')]: | |
| for price, orders in side.items(): | |
| total_at_price = sum(o.quantity for o in orders) | |
| avg_qty = np.mean([o.quantity for o in orders]) if orders else 0 | |
| for order in orders: | |
| if order.quantity >= threshold_qty: | |
| large_orders.append({ | |
| 'side': side_name, | |
| 'price': price, | |
| 'quantity': order.quantity, | |
| 'pct_of_level': order.quantity / total_at_price if total_at_price > 0 else 0, | |
| 'is_iceberg': order.quantity > avg_qty * 3 # Likely iceberg | |
| }) | |
| return sorted(large_orders, key=lambda x: x['quantity'], reverse=True) | |
| def spread_dynamics(book_history: List[LimitOrderBook], | |
| window: int = 10) -> Dict: | |
| """ | |
| Spread dynamics over time. | |
| Widening spread = uncertainty, less liquidity, informed trading. | |
| Narrowing spread = confidence, more liquidity. | |
| """ | |
| spreads = [] | |
| mids = [] | |
| imbalances = [] | |
| for book in book_history[-window:]: | |
| s = book.get_spread_bps() | |
| m = book.get_mid_price() | |
| i = book.get_order_imbalance() | |
| if s is not None: | |
| spreads.append(s) | |
| if m is not None: | |
| mids.append(m) | |
| imbalances.append(i) | |
| if len(spreads) < 2: | |
| return {} | |
| return { | |
| 'avg_spread_bps': np.mean(spreads), | |
| 'spread_volatility': np.std(spreads), | |
| 'spread_trend': spreads[-1] - spreads[0], | |
| 'spread_percentile': sum(1 for s in spreads if s <= spreads[-1]) / len(spreads), | |
| 'mid_price_change_pct': (mids[-1] / mids[0] - 1) * 100 if len(mids) >= 2 and mids[0] > 0 else 0, | |
| 'avg_imbalance': np.mean(imbalances), | |
| 'imbalance_trend': imbalances[-1] - imbalances[0] | |
| } | |
| def order_flow_-toxicity(book: LimitOrderBook, | |
| trade_history: List[Dict], | |
| window: int = 50) -> Dict: | |
| """ | |
| VPIN-like metric using LOB data. | |
| Toxic flow = aggressive orders that consume liquidity. | |
| High toxicity = informed trading = adverse selection. | |
| """ | |
| if not trade_history: | |
| return {'vpin_approx': 0.0, 'toxicity': 0.0} | |
| recent_trades = trade_history[-window:] | |
| # Classify trades as aggressive buyer or seller | |
| # (Simplified: if trade near ask = buyer aggressive) | |
| mid = book.get_mid_price() | |
| buy_volume = sum(t['quantity'] for t in recent_trades | |
| if t.get('side') == 'bid' or t.get('aggressive', False)) | |
| sell_volume = sum(t['quantity'] for t in recent_trades | |
| if t.get('side') == 'ask' or not t.get('aggressive', False)) | |
| total = buy_volume + sell_volume | |
| if total == 0: | |
| return {'vpin_approx': 0.0, 'toxicity': 0.0} | |
| # Toxicity = |buy_vol - sell_vol| / total | |
| vpin = abs(buy_volume - sell_volume) / total | |
| return { | |
| 'vpin_approx': vpin, | |
| 'toxicity': vpin, | |
| 'buy_volume': buy_volume, | |
| 'sell_volume': sell_volume, | |
| 'total_volume': total | |
| } | |
| def all_features(book: LimitOrderBook, | |
| book_history: Optional[List[LimitOrderBook]] = None) -> Dict: | |
| """Compute all LOB features at once""" | |
| features = {} | |
| # Basic features | |
| snapshot = book.get_book_snapshot() | |
| features['mid_price'] = snapshot['mid_price'] | |
| features['spread'] = snapshot['spread'] | |
| features['spread_bps'] = snapshot['spread_bps'] | |
| features['bid_depth_total'] = snapshot['total_bid_quantity'] | |
| features['ask_depth_total'] = snapshot['total_ask_quantity'] | |
| features['depth_imbalance'] = book.get_order_imbalance() | |
| # Depth profile | |
| depth = LOBFeatures.depth_profile(book) | |
| features.update({f'depth_{k}': v for k, v in depth.items()}) | |
| # Queue features | |
| queue = LOBFeatures.queue_features(book) | |
| features.update({f'queue_{k}': v for k, v in queue.items()}) | |
| # Large orders | |
| large = LOBFeatures.large_order_detection(book) | |
| features['n_large_orders'] = len(large) | |
| features['large_order_total_qty'] = sum(o['quantity'] for o in large) | |
| # Spread dynamics | |
| if book_history and len(book_history) >= 2: | |
| dynamics = LOBFeatures.spread_dynamics(book_history) | |
| features.update({f'spread_dyn_{k}': v for k, v in dynamics.items()}) | |
| return features | |
| def generate_synthetic_lob_feed(n_messages: int = 1000, | |
| base_price: float = 100.0, | |
| tick_size: float = 0.01) -> List[Dict]: | |
| """Generate synthetic LOB message feed for testing""" | |
| np.random.seed(42) | |
| messages = [] | |
| order_counter = 0 | |
| # Initialize with some orders | |
| for _ in range(50): | |
| side = 'bid' if np.random.rand() < 0.5 else 'ask' | |
| price = base_price + np.random.randint(-50, 50) * tick_size | |
| if side == 'ask': | |
| price = max(price, base_price) | |
| else: | |
| price = min(price, base_price) | |
| messages.append({ | |
| 'type': 'add', | |
| 'order_id': f'order_{order_counter}', | |
| 'side': side, | |
| 'price': round(price, 2), | |
| 'quantity': np.random.randint(100, 1000), | |
| 'timestamp': len(messages) / 1000.0 | |
| }) | |
| order_counter += 1 | |
| # Generate flowing messages | |
| for _ in range(n_messages - 50): | |
| msg_type = np.random.choice(['add', 'cancel', 'trade'], p=[0.5, 0.3, 0.2]) | |
| if msg_type == 'add': | |
| side = 'bid' if np.random.rand() < 0.5 else 'ask' | |
| offset = np.random.exponential(10) * tick_size | |
| price = base_price + (offset if side == 'ask' else -offset) | |
| price = round(max(price, 0.01), 2) | |
| messages.append({ | |
| 'type': 'add', | |
| 'order_id': f'order_{order_counter}', | |
| 'side': side, | |
| 'price': price, | |
| 'quantity': np.random.randint(100, 2000), | |
| 'timestamp': len(messages) / 1000.0 | |
| }) | |
| order_counter += 1 | |
| elif msg_type == 'cancel' and order_counter > 0: | |
| # Cancel a random existing order | |
| messages.append({ | |
| 'type': 'cancel', | |
| 'order_id': f'order_{np.random.randint(0, order_counter)}', | |
| 'timestamp': len(messages) / 1000.0 | |
| }) | |
| else: | |
| # Trade | |
| side = 'bid' if np.random.rand() < 0.5 else 'ask' | |
| messages.append({ | |
| 'type': 'trade', | |
| 'side': side, | |
| 'quantity': np.random.randint(100, 500), | |
| 'timestamp': len(messages) / 1000.0 | |
| }) | |
| return messages | |
| def process_message_feed(messages: List[Dict]) -> Tuple[LimitOrderBook, List[Dict]]: | |
| """Process a message feed and reconstruct the LOB""" | |
| book = LimitOrderBook(max_depth=20) | |
| trades = [] | |
| book_history = [] | |
| features_history = [] | |
| for msg in messages: | |
| if msg['type'] == 'add': | |
| entry = OrderBookEntry( | |
| price=msg['price'], | |
| quantity=msg['quantity'], | |
| order_id=msg['order_id'], | |
| side=msg['side'], | |
| timestamp=msg['timestamp'] | |
| ) | |
| book.add_order(entry) | |
| elif msg['type'] == 'cancel': | |
| book.cancel_order(msg['order_id']) | |
| elif msg['type'] == 'trade': | |
| side = 'bid' if msg['side'] == 'ask' else 'ask' # Opposite side | |
| avg_price, qty = book.execute_trade(side, msg['quantity'], aggressive=True) | |
| trades.append({ | |
| 'timestamp': msg['timestamp'], | |
| 'side': msg['side'], | |
| 'quantity': qty, | |
| 'avg_price': avg_price | |
| }) | |
| # Snapshot every 100 messages | |
| if len(book_history) % 100 == 0: | |
| book_history.append(book) | |
| features = LOBFeatures.all_features(book, book_history) | |
| features['timestamp'] = msg['timestamp'] | |
| features_history.append(features) | |
| return book, trades, features_history | |
| if __name__ == '__main__': | |
| print("=" * 70) | |
| print(" LIMIT ORDER BOOK RECONSTRUCTION") | |
| print("=" * 70) | |
| # Generate synthetic data | |
| messages = generate_synthetic_lob_feed(n_messages=5000) | |
| # Process | |
| book, trades, features = process_message_feed(messages) | |
| # Final snapshot | |
| snapshot = book.get_book_snapshot(depth=5) | |
| print(f"\nFinal LOB State:") | |
| print(f" Mid Price: ${snapshot['mid_price']:.2f}") | |
| print(f" Spread: {snapshot['spread_bps']:.1f} bps") | |
| print(f" Bid Depth: {snapshot['bid_depth']} levels") | |
| print(f" Ask Depth: {snapshot['ask_depth']} levels") | |
| print(f" Total Bid Qty: {snapshot['total_bid_quantity']:,}") | |
| print(f" Total Ask Qty: {snapshot['total_ask_quantity']:,}") | |
| print(f" Order Imbalance: {book.get_order_imbalance():.3f}") | |
| # Level 2 | |
| print(f"\nLevel 2 Book (top 5):") | |
| levels = LOBFeatures.price_levels(book, n=5) | |
| print(levels.to_string()) | |
| # Features | |
| if features: | |
| print(f"\nLatest LOB Features:") | |
| latest = features[-1] | |
| for k, v in latest.items(): | |
| if isinstance(v, (int, float)): | |
| print(f" {k}: {v:.4f}") | |
| print(f"\n Trades executed: {len(trades)}") | |
| print(f" Total messages processed: {len(messages)}") | |
| print(f"\n This is what Jane Street sees every microsecond.") | |
| print(f" Order imbalance, queue position, depth profile = PURE ALPHA.") | |