alphaforge-quant-system / market_making.py
Premchan369's picture
Add market making engine with Avellaneda-Stoikov quoting, inventory management, adverse selection detection
513693b verified
"""Market Making Engine — What Jane Street Actually Does
Jane Street is primarily a MARKET MAKER, not a directional trader.
They quote bid/ask on options, ETFs, bonds — make money on spread + volume.
Key challenges:
1. Adverse selection: informed traders pick off your quotes
2. Inventory risk: holding positions you don't want
3. Spread optimization: too wide = no volume, too tight = get run over
4. Regulatory constraints: Reg NMS, MiFID II
Based on:
- Avellaneda & Stoikov (2008): "High-frequency trading in a limit order book"
- Guéant et al. (2012): "Dealing with the inventory risk"
- Cartea & Jaimungal (2013): "Modeling asset prices for algorithmic trading"
"""
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple, Optional, Callable
from dataclasses import dataclass
import warnings
warnings.filterwarnings('ignore')
@dataclass
class InventoryState:
"""Current market maker position"""
position: float = 0.0 # Net position
cash: float = 0.0 # Cash balance
pnl_realized: float = 0.0
pnl_unrealized: float = 0.0
trades_executed: int = 0
quotes_submitted: int = 0
quotes_filled: int = 0
def total_pnl(self, mark_price: float) -> float:
return self.pnl_realized + self.position * mark_price + self.cash
class MarketMakerQuote:
"""Single market maker quote"""
def __init__(self, side: str, price: float, quantity: int,
aggression: str = 'passive'):
self.side = side # 'bid' or 'ask'
self.price = price
self.quantity = quantity
self.aggression = aggression # 'passive' (resting) or 'aggressive' (crossing)
self.fill_probability = 0.0
self.expected_profit = 0.0
class AvellanedaStoikovMarketMaker:
"""
Avellaneda-Stoikov (2008) market making model.
Key insight: Quote prices should DEPEND on current inventory.
Reservation price (where you're indifferent to trade):
r = s - q * γ * σ² * (T - t)
Spread:
δ^a + δ^b = γ * σ² * (T - t) + (2/γ) * ln(1 + γ/κ)
Where:
- s = mid price
- q = inventory position
- γ = risk aversion
- σ = volatility
- T-t = time remaining
- κ = order arrival intensity
As inventory grows positive → skew quotes DOWN (want to sell)
As inventory grows negative → skew quotes UP (want to buy)
"""
def __init__(self,
gamma: float = 0.1, # Risk aversion
sigma: float = 0.02, # Volatility (per period)
kappa: float = 1.5, # Order arrival rate
max_position: float = 1000.0, # Position limit
min_spread_bps: float = 1.0, # Minimum spread in bps
max_spread_bps: float = 50.0, # Maximum spread
inventory_skew_factor: float = 2.0): # How much to skew
self.gamma = gamma
self.sigma = sigma
self.kappa = kappa
self.max_position = max_position
self.min_spread_bps = min_spread_bps / 10000.0 # Convert to decimal
self.max_spread_bps = max_spread_bps / 10000.0
self.inventory_skew_factor = inventory_skew_factor
self.state = InventoryState()
self.quote_history = []
self.pnl_history = []
def reset(self):
"""Reset state"""
self.state = InventoryState()
self.quote_history = []
self.pnl_history = []
def calculate_quotes(self,
mid_price: float,
time_to_end: float = 1.0,
current_inventory: Optional[float] = None) -> Tuple[MarketMakerQuote, MarketMakerQuote]:
"""
Calculate optimal bid and ask quotes.
Returns: (bid_quote, ask_quote)
"""
if current_inventory is None:
current_inventory = self.state.position
# Reservation price (inventory-adjusted mid)
reservation_price = mid_price - current_inventory * self.gamma * (self.sigma ** 2) * time_to_end
# Optimal spread
optimal_spread = self.gamma * (self.sigma ** 2) * time_to_end + \
(2.0 / self.gamma) * np.log(1 + self.gamma / self.kappa)
# Apply min/max spread constraints
spread_decimal = max(optimal_spread, self.min_spread_bps * mid_price)
spread_decimal = min(spread_decimal, self.max_spread_bps * mid_price)
# Inventory skewing
# If long (q > 0), make ask more attractive (lower ask), bid less attractive
# If short (q < 0), make bid more attractive (higher bid), ask less attractive
skew = np.tanh(current_inventory / self.max_position * self.inventory_skew_factor)
half_spread = spread_decimal / 2
# Skew: shift quotes away from reservation price
bid_offset = half_spread * (1 + skew) # Higher bid when short
ask_offset = half_spread * (1 - skew) # Lower ask when long
bid_price = reservation_price - bid_offset
ask_price = reservation_price + ask_offset
# Ensure bid < ask
if bid_price >= ask_price:
# Emergency: force minimum spread
avg = (bid_price + ask_price) / 2
bid_price = avg - self.min_spread_bps * mid_price / 2
ask_price = avg + self.min_spread_bps * mid_price / 2
# Quantity sizing: larger when inventory is neutral, smaller when extreme
inventory_ratio = abs(current_inventory) / self.max_position
qty_multiplier = 1.0 - 0.7 * inventory_ratio # Reduce size as inventory grows
base_qty = 100
bid_qty = int(base_qty * qty_multiplier)
ask_qty = int(base_qty * qty_multiplier)
# If extremely long, don't quote on ask (or tiny qty)
if current_inventory > self.max_position * 0.9:
ask_qty = 0
# If extremely short, don't quote on bid
if current_inventory < -self.max_position * 0.9:
bid_qty = 0
bid_quote = MarketMakerQuote('bid', bid_price, bid_qty, 'passive')
ask_quote = MarketMakerQuote('ask', ask_price, ask_qty, 'passive')
# Expected fill probability (simplified)
bid_quote.fill_probability = np.exp(-self.kappa * bid_offset)
ask_quote.fill_probability = np.exp(-self.kappa * ask_offset)
# Expected profit per trade = half spread (simplified)
bid_quote.expected_profit = bid_offset
ask_quote.expected_profit = ask_offset
return bid_quote, ask_quote
def process_fill(self, quote: MarketMakerQuote,
fill_qty: int,
fill_price: float,
is_aggressive_side: bool):
"""
Process a quote fill.
is_aggressive_side: True if WE were aggressive (market order),
False if counterparty hit our resting quote
"""
if quote.side == 'bid':
# We bought
self.state.position += fill_qty
self.state.cash -= fill_qty * fill_price
self.state.trades_executed += 1
else:
# We sold
self.state.position -= fill_qty
self.state.cash += fill_qty * fill_price
self.state.trades_executed += 1
self.state.quotes_filled += 1
# Track
self.quote_history.append({
'side': quote.side,
'quote_price': quote.price,
'fill_price': fill_price,
'quantity': fill_qty,
'position_after': self.state.position,
'cash_after': self.state.cash
})
def update_mark_price(self, mark_price: float):
"""Update unrealized PnL with current mark"""
self.state.pnl_unrealized = self.state.position * mark_price + self.state.cash
self.pnl_history.append({
'mark_price': mark_price,
'position': self.state.position,
'cash': self.state.cash,
'unrealized_pnl': self.state.pnl_unrealized
})
def get_summary(self) -> Dict:
"""Get current market maker summary"""
return {
'position': self.state.position,
'cash': self.state.cash,
'trades': self.state.trades_executed,
'quotes_filled': self.state.quotes_filled,
'pnl_realized': self.state.pnl_realized,
'pnl_unrealized': self.state.pnl_unrealized,
'inventory_ratio': abs(self.state.position) / self.max_position
}
class InventoryRiskManager:
"""
Advanced inventory risk management for market making.
When inventory exceeds limits:
1. Hedge via correlated instruments
2. Cross the spread (aggressive unwind)
3. Reduce quote sizes
4. Stop quoting on the bad side entirely
"""
def __init__(self,
max_inventory: float = 1000,
hedge_threshold: float = 0.6, # Hedge at 60% of max
stop_threshold: float = 0.9, # Stop quoting at 90%
aggressive_unwind_threshold: float = 0.95): # Market order at 95%
self.max_inventory = max_inventory
self.hedge_threshold = hedge_threshold
self.stop_threshold = stop_threshold
self.aggressive_unwind_threshold = aggressive_unwind_threshold
def check_inventory(self, position: float) -> Dict:
"""Determine actions needed based on inventory"""
ratio = abs(position) / self.max_inventory
actions = {
'hedge': False,
'stop_quoting_bad_side': False,
'aggressive_unwind': False,
'reduce_size': 1.0, # Size multiplier
'status': 'normal'
}
if ratio >= self.aggressive_unwind_threshold:
actions['aggressive_unwind'] = True
actions['stop_quoting_bad_side'] = True
actions['reduce_size'] = 0.0
actions['status'] = 'CRITICAL'
elif ratio >= self.stop_threshold:
actions['stop_quoting_bad_side'] = True
actions['reduce_size'] = 0.1
actions['status'] = 'SEVERE'
elif ratio >= self.hedge_threshold:
actions['hedge'] = True
actions['reduce_size'] = 0.5
actions['status'] = 'WARNING'
elif ratio >= 0.5:
actions['reduce_size'] = 0.8
actions['status'] = 'MODERATE'
return actions
def hedge_recommendation(self,
position: float,
correlated_assets: Dict[str, float]) -> Optional[Dict]:
"""
Recommend hedge position in correlated assets.
correlated_assets: {symbol: correlation_with_primary}
"""
if abs(position) < self.max_inventory * self.hedge_threshold:
return None
# Find best hedge: highest absolute correlation
best_hedge = None
best_corr = 0
for symbol, corr in correlated_assets.items():
if abs(corr) > best_corr:
best_corr = abs(corr)
best_hedge = symbol
if best_hedge is None:
return None
# Hedge amount: offset position in primary
hedge_direction = -np.sign(position)
hedge_size = abs(position) * abs(correlated_assets[best_hedge])
return {
'hedge_symbol': best_hedge,
'direction': 'buy' if hedge_direction > 0 else 'sell',
'quantity': hedge_size,
'correlation': correlated_assets[best_hedge],
'expected_hedge_effectiveness': best_corr ** 2 # R²
}
class AdverseSelectionDetector:
"""
Detect and respond to adverse selection.
Adverse selection: Informed traders know something you don't.
When they buy from you, price drops. When they sell to you, price rises.
Detection methods:
1. Post-trade price movement
2. Order flow toxicity (VPIN)
3. Large order detection
4. Timing patterns (orders arrive in clusters before news)
"""
def __init__(self,
lookback_window: int = 20,
toxicity_threshold: float = 0.6):
self.lookback_window = lookback_window
self.toxicity_threshold = toxicity_threshold
self.trade_history = []
self.toxicity_score = 0.0
def record_trade(self,
side: str, # Which side WE filled
our_price: float, # Price we got
post_prices: List[float], # Prices after trade (1min, 5min, 15min)
quantity: int,
counterparty: Optional[str] = None):
"""Record a trade for adverse selection analysis"""
# Calculate post-trade drift
drift = 0
if post_prices and len(post_prices) >= 1:
# If we SOLD and price went UP → bad (gave away value)
# If we BOUGHT and price went DOWN → bad (overpaid)
if side == 'ask': # We sold
drift = post_prices[0] - our_price
else: # We bought
drift = our_price - post_prices[0]
self.trade_history.append({
'side': side,
'our_price': our_price,
'post_drift': drift,
'quantity': quantity,
'counterparty': counterparty,
'adverse': drift > 0 # True if trade was bad for us
})
# Keep only recent trades
if len(self.trade_history) > self.lookback_window:
self.trade_history.pop(0)
def get_toxicity_score(self) -> float:
"""Current toxicity score (0-1, higher = more adverse selection)"""
if len(self.trade_history) < 5:
return 0.0
adverse_count = sum(1 for t in self.trade_history if t['adverse'])
self.toxicity_score = adverse_count / len(self.trade_history)
return self.toxicity_score
def should_widen_spread(self) -> Tuple[bool, float]:
"""Should we widen spread due to adverse selection?"""
toxicity = self.get_toxicity_score()
if toxicity > self.toxicity_threshold:
# Widen spread proportionally
widen_factor = 1.0 + (toxicity - self.toxicity_threshold) * 2
return True, min(widen_factor, 3.0) # Max 3x wider
return False, 1.0
def get_recent_pnl(self) -> Dict:
"""P&L attribution from adverse selection"""
if not self.trade_history:
return {}
adverse_trades = [t for t in self.trade_history if t['adverse']]
good_trades = [t for t in self.trade_history if not t['adverse']]
adverse_drift = sum(t['post_drift'] * t['quantity'] for t in adverse_trades)
good_drift = sum(t['post_drift'] * t['quantity'] for t in good_trades)
return {
'total_trades': len(self.trade_history),
'adverse_trades': len(adverse_trades),
'adverse_pct': len(adverse_trades) / len(self.trade_history) * 100,
'total_adverse_cost': adverse_drift,
'total_good_gain': -good_drift,
'net_selection_cost': adverse_drift + good_drift
}
def simulate_market_making(n_steps: int = 1000,
price_drift: float = 0.0001,
volatility: float = 0.01,
arrival_rate: float = 0.3) -> pd.DataFrame:
"""
Simulate a market maker in a random walk market.
Generates synthetic tick data and lets the market maker quote and fill.
"""
np.random.seed(42)
# Initialize
mm = AvellanedaStoikovMarketMaker(
gamma=0.1,
sigma=volatility,
kappa=1.5,
max_position=1000
)
detector = AdverseSelectionDetector(lookback_window=20)
risk_mgr = InventoryRiskManager()
# Price process
price = 100.0
prices = [price]
results = []
for step in range(n_steps):
# Update price
price_change = np.random.randn() * volatility * price + price_drift * price
price += price_change
price = max(price, 0.01)
prices.append(price)
# Calculate quotes
bid_quote, ask_quote = mm.calculate_quotes(price, time_to_end=1.0)
# Check inventory risk
inventory_actions = risk_mgr.check_inventory(mm.state.position)
# Check adverse selection
widen, widen_factor = detector.should_widen_spread()
if widen:
# Widen spread
spread_adj = (widen_factor - 1.0) * (ask_quote.price - bid_quote.price) / 2
bid_quote.price -= spread_adj
ask_quote.price += spread_adj
# Simulate order arrivals
if np.random.rand() < arrival_rate:
# Someone hits our bid
if np.random.rand() < bid_quote.fill_probability:
fill_qty = np.random.randint(10, bid_quote.quantity + 1)
mm.process_fill(bid_quote, fill_qty, bid_quote.price, False)
# Record for adverse selection
future_prices = prices[-5:] if len(prices) >= 5 else prices
detector.record_trade('bid', bid_quote.price, future_prices, fill_qty)
if np.random.rand() < arrival_rate:
# Someone lifts our ask
if np.random.rand() < ask_quote.fill_probability:
fill_qty = np.random.randint(10, ask_quote.quantity + 1)
mm.process_fill(ask_quote, fill_qty, ask_quote.price, False)
future_prices = prices[-5:] if len(prices) >= 5 else prices
detector.record_trade('ask', ask_quote.price, future_prices, fill_qty)
# Mark to market
mm.update_mark_price(price)
# Record
summary = mm.get_summary()
results.append({
'step': step,
'price': price,
'bid': bid_quote.price,
'ask': ask_quote.price,
'spread_bps': (ask_quote.price - bid_quote.price) / price * 10000,
'position': summary['position'],
'cash': summary['cash'],
'inventory_ratio': summary['inventory_ratio'],
'unrealized_pnl': summary['pnl_unrealized'],
'toxicity': detector.get_toxicity_score(),
'status': inventory_actions['status']
})
return pd.DataFrame(results)
if __name__ == '__main__':
print("=" * 70)
print(" MARKET MAKING ENGINE SIMULATION")
print("=" * 70)
results = simulate_market_making(n_steps=5000)
# Summary
final = results.iloc[-1]
print(f"\nSimulation: 5000 steps, random walk market")
print(f" Initial Price: $100.00")
print(f" Final Price: ${final['price']:.2f}")
print(f" Final Position: {final['position']:.0f}")
print(f" Final Cash: ${final['cash']:.2f}")
print(f" Unrealized PnL: ${final['unrealized_pnl']:.2f}")
print(f" Avg Spread: {results['spread_bps'].mean():.1f} bps")
print(f" Avg Position: {abs(results['position']).mean():.0f}")
print(f" Max Position: {results['position'].abs().max():.0f}")
print(f" Avg Toxicity: {results['toxicity'].mean():.3f}")
# Strategy attribution
pnl_from_spread = results['spread_bps'].mean() / 10000 * 2 * 100 # Simplified
print(f"\n PnL Attribution:")
print(f" Spread capture: ~${pnl_from_spread * 50:.0f} (per 100 trades)")
print(f" Inventory risk: ${final['unrealized_pnl'] - pnl_from_spread * 50:.0f}")
print(f"\n This is how Jane Street makes money:")
print(f" 1. Quote tight spreads 1000s of times per day")
print(f" 2. Inventory management keeps risk bounded")
print(f" 3. Adverse selection detection widens when toxic flow arrives")
print(f" 4. Volume × Small spread margin = Big PnL")