"""Execution Algorithms: TWAP, VWAP, Smart Order Routing What separates retail execution from institutional execution: - Retail: Market orders, immediate execution, pay spread - Institutional: TWAP/VWAP, slice orders across time, minimize market impact Market impact model: Price moves against you proportional to order size / daily volume """ import numpy as np import pandas as pd from typing import Dict, List, Optional, Tuple from dataclasses import dataclass import warnings warnings.filterwarnings('ignore') @dataclass class Order: """Single order specification""" symbol: str side: str # 'buy' or 'sell' quantity: int order_type: str # 'market', 'limit', 'twap', 'vwap' limit_price: Optional[float] = None def __post_init__(self): self.side = self.side.lower() self.order_type = self.order_type.lower() class MarketImpactModel: """ Square-root market impact model (Almgren-Chriss, 1999). Market impact = σ * sqrt(Q / V) Where: - σ = daily volatility - Q = order quantity - V = daily volume Temporary impact: decays within minutes Permanent impact: persists """ def __init__(self, temp_impact_coef: float = 0.5, perm_impact_coef: float = 0.1, decay_halflife: int = 10): self.temp_impact_coef = temp_impact_coef self.perm_impact_coef = perm_impact_coef self.decay_halflife = decay_halflife def temporary_impact(self, order_size: int, daily_volume: int, volatility: float) -> float: """Temporary price impact (bps)""" participation = order_size / max(daily_volume, 1) return self.temp_impact_coef * volatility * np.sqrt(participation) def permanent_impact(self, order_size: int, daily_volume: int, volatility: float) -> float: """Permanent price impact (bps)""" participation = order_size / max(daily_volume, 1) return self.perm_impact_coef * volatility * participation class TWAPScheduler: """ Time-Weighted Average Price execution. Slices parent order into N child orders, equally distributed in time. When to use: When you want to minimize timing risk and have no view on intraday price direction. Simple, predictable, low market impact. Formula: Child qty = Total qty / N buckets """ def __init__(self, n_buckets: int = 20, bucket_duration_minutes: int = 15): self.n_buckets = n_buckets self.bucket_duration = bucket_duration_minutes def schedule(self, order: Order, start_time: pd.Timestamp, end_time: Optional[pd.Timestamp] = None) -> pd.DataFrame: """ Create TWAP execution schedule. Returns DataFrame with bucket_start, bucket_end, target_qty """ if end_time is None: end_time = start_time + pd.Timedelta( minutes=self.n_buckets * self.bucket_duration ) # Time buckets buckets = pd.date_range( start=start_time, end=end_time, periods=self.n_buckets + 1 ) # Equal quantity per bucket qty_per_bucket = order.quantity // self.n_buckets remainder = order.quantity % self.n_buckets quantities = [qty_per_bucket] * self.n_buckets # Add remainder to first buckets for i in range(remainder): quantities[i] += 1 schedule = pd.DataFrame({ 'bucket_start': buckets[:-1], 'bucket_end': buckets[1:], 'target_qty': quantities, 'fraction': 1.0 / self.n_buckets, 'algorithm': 'TWAP', 'symbol': order.symbol, 'side': order.side }) return schedule def execute(self, schedule: pd.DataFrame, market_prices: pd.Series, impact_model: Optional[MarketImpactModel] = None, daily_volume: int = 1000000, volatility: float = 0.02) -> Dict: """ Simulate TWAP execution with market impact. Returns execution statistics. """ if impact_model is None: impact_model = MarketImpactModel() executed_qty = 0 total_cost = 0 prices = [] impacts = [] for _, row in schedule.iterrows(): qty = row['target_qty'] # Get price at bucket start (approximation) mask = market_prices.index >= row['bucket_start'] if mask.any(): price = market_prices[mask].iloc[0] else: price = market_prices.iloc[-1] # Market impact impact_bps = impact_model.temporary_impact( qty, daily_volume, volatility ) impact_price = price * (1 + impact_bps / 10000) # Cost cost = qty * impact_price total_cost += cost executed_qty += qty prices.append(price) impacts.append(impact_bps) # VWAP benchmark vwap = total_cost / executed_qty if executed_qty > 0 else 0 # Metrics avg_impact = np.mean(impacts) max_impact = np.max(impacts) return { 'algorithm': 'TWAP', 'total_qty': executed_qty, 'total_cost': total_cost, 'avg_price': vwap, 'avg_impact_bps': avg_impact, 'max_impact_bps': max_impact, 'slippage_bps': avg_impact, 'n_child_orders': len(schedule) } class VWAPScheduler: """ Volume-Weighted Average Price execution. Slices parent order proportionally to historical volume profile. Executes more in high-volume periods (typically open, close, mid-day lull). When to use: When you want to match the market VWAP. Institutional benchmark: Did my execution VWAP match the market VWAP? Formula: Child qty_i = Total qty * (Volume_i / Total_Volume) """ def __init__(self, n_buckets: int = 20, default_profile: Optional[Dict[int, float]] = None): self.n_buckets = n_buckets # Default intraday volume profile (U-shape: high at open/close) if default_profile is None: # Hour of day -> volume fraction (simplified) self.default_profile = { 9: 0.08, # 9-10 AM: High 10: 0.06, 11: 0.05, 12: 0.04, # Mid-day lull 13: 0.04, 14: 0.05, 15: 0.07, 16: 0.10, # 3-4 PM: High (close) } else: self.default_profile = default_profile def estimate_volume_profile(self, trade_data: pd.DataFrame, bucket_size: str = '30min') -> pd.Series: """ Estimate intraday volume profile from historical trade data. trade_data columns: timestamp, volume """ trade_data = trade_data.copy() trade_data['time'] = pd.to_datetime(trade_data.index).time # Resample profile = trade_data.resample(bucket_size)['volume'].mean() # Normalize to fractions profile = profile / profile.sum() return profile def schedule(self, order: Order, start_time: pd.Timestamp, end_time: Optional[pd.Timestamp] = None, volume_profile: Optional[pd.Series] = None) -> pd.DataFrame: """Create VWAP execution schedule""" if end_time is None: end_time = start_time + pd.Timedelta(hours=6) # Generate time buckets n_buckets = self.n_buckets buckets = pd.date_range(start=start_time, end=end_time, periods=n_buckets + 1) # Get volume fractions for each bucket if volume_profile is not None: # Map buckets to volume profile fractions = [] for i in range(n_buckets): bucket_start = buckets[i] hour = bucket_start.hour frac = volume_profile.get(hour, 1.0 / n_buckets) fractions.append(frac) # Normalize fractions = np.array(fractions) fractions = fractions / fractions.sum() else: fractions = np.ones(n_buckets) / n_buckets # Allocate quantities quantities = (fractions * order.quantity).astype(int) # Handle rounding remainder = order.quantity - quantities.sum() quantities[0] += remainder schedule = pd.DataFrame({ 'bucket_start': buckets[:-1], 'bucket_end': buckets[1:], 'target_qty': quantities, 'fraction': fractions, 'algorithm': 'VWAP', 'symbol': order.symbol, 'side': order.side }) return schedule def execute(self, schedule: pd.DataFrame, market_prices: pd.Series, market_volumes: pd.Series, impact_model: Optional[MarketImpactModel] = None) -> Dict: """Simulate VWAP execution""" if impact_model is None: impact_model = MarketImpactModel() executed_qty = 0 total_cost = 0 prices = [] impacts = [] for _, row in schedule.iterrows(): qty = row['target_qty'] if qty <= 0: continue mask = market_prices.index >= row['bucket_start'] if mask.any(): price = market_prices[mask].iloc[0] vol = market_volumes[mask].iloc[0] if len(market_volumes[mask]) > 0 else 1000000 else: price = market_prices.iloc[-1] vol = 1000000 # Impact proportional to participation impact_bps = impact_model.temporary_impact(qty, vol, 0.02) impact_price = price * (1 + impact_bps / 10000) cost = qty * impact_price total_cost += cost executed_qty += qty prices.append(price) impacts.append(impact_bps) vwap = total_cost / executed_qty if executed_qty > 0 else 0 # Market VWAP (what we tried to match) market_vwap = (market_prices * market_volumes).sum() / market_volumes.sum() return { 'algorithm': 'VWAP', 'total_qty': executed_qty, 'total_cost': total_cost, 'avg_price': vwap, 'market_vwap': market_vwap, 'vwap_deviation_bps': abs(vwap - market_vwap) / market_vwap * 10000 if market_vwap > 0 else 0, 'avg_impact_bps': np.mean(impacts) if impacts else 0, 'n_child_orders': len(schedule) } class SmartOrderRouter: """ Smart Order Routing: Select optimal venue/algorithm based on order characteristics. Decision tree: - Small orders (< 1% ADV): Market/limit, single venue - Medium orders (1-10% ADV): TWAP over 1-2 hours - Large orders (> 10% ADV): VWAP over full day, possibly dark pools - Urgent: Market order, accept impact - Patient: TWAP/VWAP, minimize impact """ def __init__(self, impact_model: Optional[MarketImpactModel] = None): self.impact_model = impact_model or MarketImpactModel() self.twap = TWAPScheduler() self.vwap = VWAPScheduler() def route_order(self, order: Order, avg_daily_volume: int, urgency: str = 'normal', volatility: float = 0.02) -> Dict: """ Route order to optimal execution strategy. Args: order: Order specification avg_daily_volume: Average daily volume of the symbol urgency: 'urgent', 'normal', 'patient' volatility: Daily volatility Returns: Dict with routing decision and execution schedule """ participation = order.quantity / max(avg_daily_volume, 1) # Decision logic if urgency == 'urgent' or participation < 0.01: # Small or urgent: Single market/limit order strategy = 'market' expected_impact = self.impact_model.temporary_impact( order.quantity, avg_daily_volume, volatility ) schedule = pd.DataFrame({ 'bucket_start': [pd.Timestamp.now()], 'bucket_end': [pd.Timestamp.now()], 'target_qty': [order.quantity], 'fraction': [1.0], 'algorithm': 'MARKET', 'symbol': [order.symbol], 'side': [order.side] }) elif participation < 0.05: # Medium: TWAP over 2 hours strategy = 'twap' schedule = self.twap.schedule( order, pd.Timestamp.now(), end_time=pd.Timestamp.now() + pd.Timedelta(hours=2) ) expected_impact = self.impact_model.temporary_impact( order.quantity // len(schedule), avg_daily_volume, volatility ) else: # Large: VWAP over full day strategy = 'vwap' schedule = self.vwap.schedule( order, pd.Timestamp.now(), end_time=pd.Timestamp.now() + pd.Timedelta(hours=6) ) expected_impact = self.impact_model.temporary_impact( order.quantity // len(schedule), avg_daily_volume, volatility ) return { 'order': order, 'strategy': strategy, 'participation_rate': participation, 'expected_impact_bps': expected_impact, 'schedule': schedule, 'urgency': urgency } def benchmark_execution_algorithms(): """Compare TWAP vs VWAP vs Market order on synthetic data""" np.random.seed(42) # Generate synthetic intraday data n_minutes = 390 # Trading day minutes (9:30 - 16:00) times = pd.date_range('2024-01-01 09:30', periods=n_minutes, freq='1min') # Price: random walk with slight drift price = 100.0 prices = [price] for _ in range(n_minutes - 1): price *= (1 + np.random.randn() * 0.001) prices.append(price) # Volume: U-shaped intraday pattern base_vol = 1000 hours = np.arange(n_minutes) / 60 vol_pattern = 0.5 + 2.0 * np.exp(-((hours - 0.5) ** 2) / 0.1) + \ 0.5 * np.sin(hours * np.pi) volumes = (base_vol * vol_pattern * (1 + np.random.randn(n_minutes) * 0.2)).astype(int) volumes = np.maximum(volumes, 100) price_series = pd.Series(prices, index=times) volume_series = pd.Series(volumes, index=times) # Create order order = Order(symbol='AAPL', side='buy', quantity=50000, order_type='twap') # TWAP twap = TWAPScheduler(n_buckets=20) twap_schedule = twap.schedule(order, times[0]) twap_result = twap.execute(twap_schedule, price_series, daily_volume=volumes.sum(), volatility=0.02) # VWAP vwap = VWAPScheduler(n_buckets=20) vwap_schedule = vwap.schedule(order, times[0], volume_profile=None) vwap_result = vwap.execute(vwap_schedule, price_series, volume_series) # Market (single order) market_impact = MarketImpactModel() market_price = price_series.iloc[0] impact = market_impact.temporary_impact(50000, volumes.sum(), 0.02) market_cost = 50000 * market_price * (1 + impact / 10000) print("=" * 60) print("EXECUTION ALGORITHM BENCHMARK") print("=" * 60) print(f"\nOrder: Buy 50,000 AAPL shares") print(f"ADV: {volumes.sum():,} | Participation: {50000/volumes.sum()*100:.1f}%") print() print(f"MARKET ORDER:") print(f" Cost: ${market_cost:,.2f} | Impact: {impact:.1f} bps | Slippage: {impact:.1f} bps") print() print(f"TWAP:") print(f" Cost: ${twap_result['total_cost']:,.2f} | Impact: {twap_result['avg_impact_bps']:.1f} bps") print(f" Avg Price: ${twap_result['avg_price']:.2f} | Child Orders: {twap_result['n_child_orders']}") print() print(f"VWAP:") print(f" Cost: ${vwap_result['total_cost']:,.2f} | Impact: {vwap_result['avg_impact_bps']:.1f} bps") print(f" Avg Price: ${vwap_result['avg_price']:.2f}") print(f" Market VWAP: ${vwap_result['market_vwap']:.2f} | Deviation: {vwap_result['vwap_deviation_bps']:.1f} bps") print() savings_twap = (market_cost - twap_result['total_cost']) / market_cost * 100 savings_vwap = (market_cost - vwap_result['total_cost']) / market_cost * 100 print(f"Savings vs Market Order:") print(f" TWAP: {savings_twap:.2f}% | VWAP: {savings_vwap:.2f}%") if __name__ == '__main__': benchmark_execution_algorithms()