alphaforge-quant-system / limit_order_book.py
Premchan369's picture
Add Level 2 LOB reconstruction with full order book, queue position, depth profile, spread dynamics
2b738f4 verified
"""Limit Order Book (LOB) Reconstruction and Level 2 Features
What Jane Street sees that retail doesn't:
- Full Level 2 order book (10+ price levels, not just best bid/ask)
- Queue position for each order
- Order arrival/cancel rates
- Market depth profile
- Spread dynamics (widening = informed trading)
- Large order detection
This is the foundation of HIGH-FREQUENCY alpha.
"""
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple, Optional, NamedTuple
from collections import defaultdict
import bisect
import warnings
warnings.filterwarnings('ignore')
class OrderBookEntry:
"""Single entry in the order book"""
def __init__(self, price: float, quantity: int, order_id: str,
side: str, timestamp: float):
self.price = price
self.quantity = quantity
self.order_id = order_id
self.side = side # 'bid' or 'ask'
self.timestamp = timestamp
def __repr__(self):
return f"{self.side.upper()} {self.quantity}@{self.price:.2f}"
class LimitOrderBook:
"""
Full Limit Order Book reconstruction from message feed.
Jane Street processes millions of these per second.
Key insight: The order book itself CONTAINS alpha.
- Large orders at round numbers = resistance/support
- Order imbalance predicts next price move (30ms ahead)
- Spread dynamics = informed vs uninformed flow
"""
def __init__(self, max_depth: int = 10):
self.max_depth = max_depth
self.bids = {} # price -> list of OrderBookEntry
self.asks = {} # price -> list of OrderBookEntry
self.bid_prices = [] # Sorted descending
self.ask_prices = [] # Sorted ascending
self.order_map = {} # order_id -> (side, price)
# Statistics
self.trade_history = []
self.imbalance_history = []
self.spread_history = []
self.depth_history = []
def add_order(self, order: OrderBookEntry):
"""Add a limit order"""
side_dict = self.bids if order.side == 'bid' else self.asks
price_list = self.bid_prices if order.side == 'bid' else self.ask_prices
if order.price not in side_dict:
side_dict[order.price] = []
bisect.insort(price_list, order.price)
if order.side == 'bid':
price_list.sort(reverse=True)
side_dict[order.price].append(order)
self.order_map[order.order_id] = (order.side, order.price)
def cancel_order(self, order_id: str):
"""Cancel a limit order"""
if order_id not in self.order_map:
return False
side, price = self.order_map[order_id]
side_dict = self.bids if side == 'bid' else self.asks
if price in side_dict:
side_dict[price] = [o for o in side_dict[price] if o.order_id != order_id]
if not side_dict[price]:
del side_dict[price]
price_list = self.bid_prices if side == 'bid' else self.ask_prices
price_list.remove(price)
del self.order_map[order_id]
return True
def execute_trade(self, side: str, quantity: int,
aggressive: bool = True) -> Tuple[float, int]:
"""
Execute a market order against the book.
aggressive=True: market order (crosses spread)
aggressive=False: limit order that hits
Returns: (avg_price, executed_qty)
"""
remaining = quantity
total_cost = 0.0
# Match against opposite side
opposite = 'ask' if side == 'bid' else 'bid'
opposite_dict = self.asks if opposite == 'ask' else self.bids
price_list = self.ask_prices if opposite == 'ask' else self.bid_prices
while remaining > 0 and price_list:
best_price = price_list[0]
if best_price not in opposite_dict:
price_list.pop(0)
continue
level_orders = opposite_dict[best_price]
while remaining > 0 and level_orders:
order = level_orders[0]
exec_qty = min(remaining, order.quantity)
total_cost += exec_qty * best_price
remaining -= exec_qty
order.quantity -= exec_qty
if order.quantity <= 0:
level_orders.pop(0)
if order.order_id in self.order_map:
del self.order_map[order.order_id]
if not level_orders:
del opposite_dict[best_price]
price_list.pop(0)
executed = quantity - remaining
avg_price = total_cost / executed if executed > 0 else 0.0
# Record trade
if executed > 0:
self.trade_history.append({
'side': side,
'quantity': executed,
'avg_price': avg_price,
'aggressive': aggressive
})
return avg_price, executed
def get_best_bid(self) -> Optional[float]:
return self.bid_prices[0] if self.bid_prices else None
def get_best_ask(self) -> Optional[float]:
return self.ask_prices[0] if self.ask_prices else None
def get_mid_price(self) -> Optional[float]:
bb = self.get_best_bid()
ba = self.get_best_ask()
if bb is not None and ba is not None:
return (bb + ba) / 2
return None
def get_spread(self) -> Optional[float]:
bb = self.get_best_bid()
ba = self.get_best_ask()
if bb is not None and ba is not None:
return ba - bb
return None
def get_spread_bps(self) -> Optional[float]:
spread = self.get_spread()
mid = self.get_mid_price()
if spread is not None and mid is not None:
return (spread / mid) * 10000
return None
def get_book_snapshot(self, depth: Optional[int] = None) -> Dict:
"""Get a snapshot of the full book"""
depth = depth or self.max_depth
bids_snapshot = []
for p in self.bid_prices[:depth]:
if p in self.bids:
total_qty = sum(o.quantity for o in self.bids[p])
num_orders = len(self.bids[p])
bids_snapshot.append({
'price': p,
'quantity': total_qty,
'num_orders': num_orders,
'side': 'bid'
})
asks_snapshot = []
for p in self.ask_prices[:depth]:
if p in self.asks:
total_qty = sum(o.quantity for o in self.asks[p])
num_orders = len(self.asks[p])
asks_snapshot.append({
'price': p,
'quantity': total_qty,
'num_orders': num_orders,
'side': 'ask'
})
return {
'bids': bids_snapshot,
'asks': asks_snapshot,
'mid_price': self.get_mid_price(),
'spread': self.get_spread(),
'spread_bps': self.get_spread_bps(),
'bid_depth': len(self.bid_prices),
'ask_depth': len(self.ask_prices),
'total_bid_quantity': sum(sum(o.quantity for o in self.bids[p])
for p in self.bid_prices),
'total_ask_quantity': sum(sum(o.quantity for o in self.asks[p])
for p in self.ask_prices)
}
def get_order_imbalance(self, levels: int = 5) -> float:
"""
Order imbalance at top N levels.
Positive = more buying interest (bullish short-term)
Negative = more selling interest (bearish short-term)
Jane Street's #1 short-term signal.
"""
bid_qty = sum(
sum(o.quantity for o in self.bids[p])
for p in self.bid_prices[:levels] if p in self.bids
)
ask_qty = sum(
sum(o.quantity for o in self.asks[p])
for p in self.ask_prices[:levels] if p in self.asks
)
total = bid_qty + ask_qty
if total == 0:
return 0.0
return (bid_qty - ask_qty) / total
class LOBFeatures:
"""
Extract institutional-grade features from reconstructed LOB.
These features predict price movements 1-100ms ahead.
This is the EDGE that makes Jane Street profitable.
"""
@staticmethod
def price_levels(book: LimitOrderBook, n: int = 10) -> pd.DataFrame:
"""Price level data (Level 2 equivalent)"""
snapshot = book.get_book_snapshot(depth=n)
rows = []
# Bids (from best to worst)
for i, level in enumerate(snapshot['bids']):
rows.append({
'side': 'bid',
'level': i + 1,
'price': level['price'],
'quantity': level['quantity'],
'num_orders': level['num_orders']
})
# Asks
for i, level in enumerate(snapshot['asks']):
rows.append({
'side': 'ask',
'level': i + 1,
'price': level['price'],
'quantity': level['quantity'],
'num_orders': level['num_orders']
})
return pd.DataFrame(rows)
@staticmethod
def depth_profile(book: LimitOrderBook) -> Dict:
"""
Market depth profile across price levels.
Skewed depth (more on one side) predicts price direction.
"""
snapshot = book.get_book_snapshot()
bids = snapshot['bids']
asks = snapshot['asks']
# Cumulative depth
cum_bid_qty = np.cumsum([b['quantity'] for b in bids])
cum_ask_qty = np.cumsum([a['quantity'] for a in asks])
# Price distance from mid
mid = snapshot['mid_price'] or 0
bid_distances = [mid - b['price'] for b in bids]
ask_distances = [a['price'] - mid for a in asks]
return {
'bid_depth_1': cum_bid_qty[0] if len(cum_bid_qty) > 0 else 0,
'bid_depth_5': cum_bid_qty[4] if len(cum_bid_qty) > 4 else cum_bid_qty[-1] if len(cum_bid_qty) > 0 else 0,
'bid_depth_10': cum_bid_qty[9] if len(cum_bid_qty) > 9 else cum_bid_qty[-1] if len(cum_bid_qty) > 0 else 0,
'ask_depth_1': cum_ask_qty[0] if len(cum_ask_qty) > 0 else 0,
'ask_depth_5': cum_ask_qty[4] if len(cum_ask_qty) > 4 else cum_ask_qty[-1] if len(cum_ask_qty) > 0 else 0,
'ask_depth_10': cum_ask_qty[9] if len(cum_ask_qty) > 9 else cum_ask_qty[-1] if len(cum_ask_qty) > 0 else 0,
'depth_ratio_1': (cum_bid_qty[0] / cum_ask_qty[0]) if len(cum_bid_qty) > 0 and len(cum_ask_qty) > 0 and cum_ask_qty[0] > 0 else 1.0,
'depth_ratio_5': (cum_bid_qty[4] / cum_ask_qty[4]) if len(cum_bid_qty) > 4 and len(cum_ask_qty) > 4 and cum_ask_qty[4] > 0 else 1.0,
'depth_skew': (snapshot['total_bid_quantity'] - snapshot['total_ask_quantity']) /
(snapshot['total_bid_quantity'] + snapshot['total_ask_quantity'] + 1)
}
@staticmethod
def queue_features(book: LimitOrderBook) -> Dict:
"""
Queue position features.
Being at the FRONT of the queue means you get filled first = better price.
Queue length = how long you wait.
"""
snapshot = book.get_book_snapshot(depth=1)
best_bid = snapshot['bids'][0] if snapshot['bids'] else None
best_ask = snapshot['asks'][0] if snapshot['asks'] else None
return {
'bid_queue_length': best_bid['num_orders'] if best_bid else 0,
'ask_queue_length': best_ask['num_orders'] if best_ask else 0,
'bid_queue_qty': best_bid['quantity'] if best_bid else 0,
'ask_queue_qty': best_ask['quantity'] if best_ask else 0,
'queue_imbalance': ((best_bid['num_orders'] if best_bid else 0) -
(best_ask['num_orders'] if best_ask else 0))
}
@staticmethod
def large_order_detection(book: LimitOrderBook,
threshold_qty: float = 1000,
threshold_pct: float = 0.3) -> List[Dict]:
"""
Detect unusually large orders.
Large orders = informed traders or iceberg orders.
Can predict price movements.
"""
snapshot = book.get_book_snapshot()
large_orders = []
for side, side_name in [(book.bids, 'bid'), (book.asks, 'ask')]:
for price, orders in side.items():
total_at_price = sum(o.quantity for o in orders)
avg_qty = np.mean([o.quantity for o in orders]) if orders else 0
for order in orders:
if order.quantity >= threshold_qty:
large_orders.append({
'side': side_name,
'price': price,
'quantity': order.quantity,
'pct_of_level': order.quantity / total_at_price if total_at_price > 0 else 0,
'is_iceberg': order.quantity > avg_qty * 3 # Likely iceberg
})
return sorted(large_orders, key=lambda x: x['quantity'], reverse=True)
@staticmethod
def spread_dynamics(book_history: List[LimitOrderBook],
window: int = 10) -> Dict:
"""
Spread dynamics over time.
Widening spread = uncertainty, less liquidity, informed trading.
Narrowing spread = confidence, more liquidity.
"""
spreads = []
mids = []
imbalances = []
for book in book_history[-window:]:
s = book.get_spread_bps()
m = book.get_mid_price()
i = book.get_order_imbalance()
if s is not None:
spreads.append(s)
if m is not None:
mids.append(m)
imbalances.append(i)
if len(spreads) < 2:
return {}
return {
'avg_spread_bps': np.mean(spreads),
'spread_volatility': np.std(spreads),
'spread_trend': spreads[-1] - spreads[0],
'spread_percentile': sum(1 for s in spreads if s <= spreads[-1]) / len(spreads),
'mid_price_change_pct': (mids[-1] / mids[0] - 1) * 100 if len(mids) >= 2 and mids[0] > 0 else 0,
'avg_imbalance': np.mean(imbalances),
'imbalance_trend': imbalances[-1] - imbalances[0]
}
@staticmethod
def order_flow_-toxicity(book: LimitOrderBook,
trade_history: List[Dict],
window: int = 50) -> Dict:
"""
VPIN-like metric using LOB data.
Toxic flow = aggressive orders that consume liquidity.
High toxicity = informed trading = adverse selection.
"""
if not trade_history:
return {'vpin_approx': 0.0, 'toxicity': 0.0}
recent_trades = trade_history[-window:]
# Classify trades as aggressive buyer or seller
# (Simplified: if trade near ask = buyer aggressive)
mid = book.get_mid_price()
buy_volume = sum(t['quantity'] for t in recent_trades
if t.get('side') == 'bid' or t.get('aggressive', False))
sell_volume = sum(t['quantity'] for t in recent_trades
if t.get('side') == 'ask' or not t.get('aggressive', False))
total = buy_volume + sell_volume
if total == 0:
return {'vpin_approx': 0.0, 'toxicity': 0.0}
# Toxicity = |buy_vol - sell_vol| / total
vpin = abs(buy_volume - sell_volume) / total
return {
'vpin_approx': vpin,
'toxicity': vpin,
'buy_volume': buy_volume,
'sell_volume': sell_volume,
'total_volume': total
}
@staticmethod
def all_features(book: LimitOrderBook,
book_history: Optional[List[LimitOrderBook]] = None) -> Dict:
"""Compute all LOB features at once"""
features = {}
# Basic features
snapshot = book.get_book_snapshot()
features['mid_price'] = snapshot['mid_price']
features['spread'] = snapshot['spread']
features['spread_bps'] = snapshot['spread_bps']
features['bid_depth_total'] = snapshot['total_bid_quantity']
features['ask_depth_total'] = snapshot['total_ask_quantity']
features['depth_imbalance'] = book.get_order_imbalance()
# Depth profile
depth = LOBFeatures.depth_profile(book)
features.update({f'depth_{k}': v for k, v in depth.items()})
# Queue features
queue = LOBFeatures.queue_features(book)
features.update({f'queue_{k}': v for k, v in queue.items()})
# Large orders
large = LOBFeatures.large_order_detection(book)
features['n_large_orders'] = len(large)
features['large_order_total_qty'] = sum(o['quantity'] for o in large)
# Spread dynamics
if book_history and len(book_history) >= 2:
dynamics = LOBFeatures.spread_dynamics(book_history)
features.update({f'spread_dyn_{k}': v for k, v in dynamics.items()})
return features
def generate_synthetic_lob_feed(n_messages: int = 1000,
base_price: float = 100.0,
tick_size: float = 0.01) -> List[Dict]:
"""Generate synthetic LOB message feed for testing"""
np.random.seed(42)
messages = []
order_counter = 0
# Initialize with some orders
for _ in range(50):
side = 'bid' if np.random.rand() < 0.5 else 'ask'
price = base_price + np.random.randint(-50, 50) * tick_size
if side == 'ask':
price = max(price, base_price)
else:
price = min(price, base_price)
messages.append({
'type': 'add',
'order_id': f'order_{order_counter}',
'side': side,
'price': round(price, 2),
'quantity': np.random.randint(100, 1000),
'timestamp': len(messages) / 1000.0
})
order_counter += 1
# Generate flowing messages
for _ in range(n_messages - 50):
msg_type = np.random.choice(['add', 'cancel', 'trade'], p=[0.5, 0.3, 0.2])
if msg_type == 'add':
side = 'bid' if np.random.rand() < 0.5 else 'ask'
offset = np.random.exponential(10) * tick_size
price = base_price + (offset if side == 'ask' else -offset)
price = round(max(price, 0.01), 2)
messages.append({
'type': 'add',
'order_id': f'order_{order_counter}',
'side': side,
'price': price,
'quantity': np.random.randint(100, 2000),
'timestamp': len(messages) / 1000.0
})
order_counter += 1
elif msg_type == 'cancel' and order_counter > 0:
# Cancel a random existing order
messages.append({
'type': 'cancel',
'order_id': f'order_{np.random.randint(0, order_counter)}',
'timestamp': len(messages) / 1000.0
})
else:
# Trade
side = 'bid' if np.random.rand() < 0.5 else 'ask'
messages.append({
'type': 'trade',
'side': side,
'quantity': np.random.randint(100, 500),
'timestamp': len(messages) / 1000.0
})
return messages
def process_message_feed(messages: List[Dict]) -> Tuple[LimitOrderBook, List[Dict]]:
"""Process a message feed and reconstruct the LOB"""
book = LimitOrderBook(max_depth=20)
trades = []
book_history = []
features_history = []
for msg in messages:
if msg['type'] == 'add':
entry = OrderBookEntry(
price=msg['price'],
quantity=msg['quantity'],
order_id=msg['order_id'],
side=msg['side'],
timestamp=msg['timestamp']
)
book.add_order(entry)
elif msg['type'] == 'cancel':
book.cancel_order(msg['order_id'])
elif msg['type'] == 'trade':
side = 'bid' if msg['side'] == 'ask' else 'ask' # Opposite side
avg_price, qty = book.execute_trade(side, msg['quantity'], aggressive=True)
trades.append({
'timestamp': msg['timestamp'],
'side': msg['side'],
'quantity': qty,
'avg_price': avg_price
})
# Snapshot every 100 messages
if len(book_history) % 100 == 0:
book_history.append(book)
features = LOBFeatures.all_features(book, book_history)
features['timestamp'] = msg['timestamp']
features_history.append(features)
return book, trades, features_history
if __name__ == '__main__':
print("=" * 70)
print(" LIMIT ORDER BOOK RECONSTRUCTION")
print("=" * 70)
# Generate synthetic data
messages = generate_synthetic_lob_feed(n_messages=5000)
# Process
book, trades, features = process_message_feed(messages)
# Final snapshot
snapshot = book.get_book_snapshot(depth=5)
print(f"\nFinal LOB State:")
print(f" Mid Price: ${snapshot['mid_price']:.2f}")
print(f" Spread: {snapshot['spread_bps']:.1f} bps")
print(f" Bid Depth: {snapshot['bid_depth']} levels")
print(f" Ask Depth: {snapshot['ask_depth']} levels")
print(f" Total Bid Qty: {snapshot['total_bid_quantity']:,}")
print(f" Total Ask Qty: {snapshot['total_ask_quantity']:,}")
print(f" Order Imbalance: {book.get_order_imbalance():.3f}")
# Level 2
print(f"\nLevel 2 Book (top 5):")
levels = LOBFeatures.price_levels(book, n=5)
print(levels.to_string())
# Features
if features:
print(f"\nLatest LOB Features:")
latest = features[-1]
for k, v in latest.items():
if isinstance(v, (int, float)):
print(f" {k}: {v:.4f}")
print(f"\n Trades executed: {len(trades)}")
print(f" Total messages processed: {len(messages)}")
print(f"\n This is what Jane Street sees every microsecond.")
print(f" Order imbalance, queue position, depth profile = PURE ALPHA.")