| """Execution Algorithms: TWAP, VWAP, Smart Order Routing |
| |
| What separates retail execution from institutional execution: |
| - Retail: Market orders, immediate execution, pay spread |
| - Institutional: TWAP/VWAP, slice orders across time, minimize market impact |
| |
| Market impact model: Price moves against you proportional to order size / daily volume |
| """ |
| import numpy as np |
| import pandas as pd |
| from typing import Dict, List, Optional, Tuple |
| from dataclasses import dataclass |
| import warnings |
| warnings.filterwarnings('ignore') |
|
|
|
|
| @dataclass |
| class Order: |
| """Single order specification""" |
| symbol: str |
| side: str |
| quantity: int |
| order_type: str |
| limit_price: Optional[float] = None |
| |
| def __post_init__(self): |
| self.side = self.side.lower() |
| self.order_type = self.order_type.lower() |
|
|
|
|
| class MarketImpactModel: |
| """ |
| Square-root market impact model (Almgren-Chriss, 1999). |
| |
| Market impact = σ * sqrt(Q / V) |
| Where: |
| - σ = daily volatility |
| - Q = order quantity |
| - V = daily volume |
| |
| Temporary impact: decays within minutes |
| Permanent impact: persists |
| """ |
| |
| def __init__(self, |
| temp_impact_coef: float = 0.5, |
| perm_impact_coef: float = 0.1, |
| decay_halflife: int = 10): |
| self.temp_impact_coef = temp_impact_coef |
| self.perm_impact_coef = perm_impact_coef |
| self.decay_halflife = decay_halflife |
| |
| def temporary_impact(self, order_size: int, daily_volume: int, |
| volatility: float) -> float: |
| """Temporary price impact (bps)""" |
| participation = order_size / max(daily_volume, 1) |
| return self.temp_impact_coef * volatility * np.sqrt(participation) |
| |
| def permanent_impact(self, order_size: int, daily_volume: int, |
| volatility: float) -> float: |
| """Permanent price impact (bps)""" |
| participation = order_size / max(daily_volume, 1) |
| return self.perm_impact_coef * volatility * participation |
|
|
|
|
| class TWAPScheduler: |
| """ |
| Time-Weighted Average Price execution. |
| |
| Slices parent order into N child orders, equally distributed in time. |
| |
| When to use: When you want to minimize timing risk and have no view |
| on intraday price direction. Simple, predictable, low market impact. |
| |
| Formula: Child qty = Total qty / N buckets |
| """ |
| |
| def __init__(self, |
| n_buckets: int = 20, |
| bucket_duration_minutes: int = 15): |
| self.n_buckets = n_buckets |
| self.bucket_duration = bucket_duration_minutes |
| |
| def schedule(self, order: Order, |
| start_time: pd.Timestamp, |
| end_time: Optional[pd.Timestamp] = None) -> pd.DataFrame: |
| """ |
| Create TWAP execution schedule. |
| |
| Returns DataFrame with bucket_start, bucket_end, target_qty |
| """ |
| if end_time is None: |
| end_time = start_time + pd.Timedelta( |
| minutes=self.n_buckets * self.bucket_duration |
| ) |
| |
| |
| buckets = pd.date_range( |
| start=start_time, |
| end=end_time, |
| periods=self.n_buckets + 1 |
| ) |
| |
| |
| qty_per_bucket = order.quantity // self.n_buckets |
| remainder = order.quantity % self.n_buckets |
| |
| quantities = [qty_per_bucket] * self.n_buckets |
| |
| for i in range(remainder): |
| quantities[i] += 1 |
| |
| schedule = pd.DataFrame({ |
| 'bucket_start': buckets[:-1], |
| 'bucket_end': buckets[1:], |
| 'target_qty': quantities, |
| 'fraction': 1.0 / self.n_buckets, |
| 'algorithm': 'TWAP', |
| 'symbol': order.symbol, |
| 'side': order.side |
| }) |
| |
| return schedule |
| |
| def execute(self, schedule: pd.DataFrame, |
| market_prices: pd.Series, |
| impact_model: Optional[MarketImpactModel] = None, |
| daily_volume: int = 1000000, |
| volatility: float = 0.02) -> Dict: |
| """ |
| Simulate TWAP execution with market impact. |
| |
| Returns execution statistics. |
| """ |
| if impact_model is None: |
| impact_model = MarketImpactModel() |
| |
| executed_qty = 0 |
| total_cost = 0 |
| prices = [] |
| impacts = [] |
| |
| for _, row in schedule.iterrows(): |
| qty = row['target_qty'] |
| |
| |
| mask = market_prices.index >= row['bucket_start'] |
| if mask.any(): |
| price = market_prices[mask].iloc[0] |
| else: |
| price = market_prices.iloc[-1] |
| |
| |
| impact_bps = impact_model.temporary_impact( |
| qty, daily_volume, volatility |
| ) |
| impact_price = price * (1 + impact_bps / 10000) |
| |
| |
| cost = qty * impact_price |
| total_cost += cost |
| executed_qty += qty |
| |
| prices.append(price) |
| impacts.append(impact_bps) |
| |
| |
| vwap = total_cost / executed_qty if executed_qty > 0 else 0 |
| |
| |
| avg_impact = np.mean(impacts) |
| max_impact = np.max(impacts) |
| |
| return { |
| 'algorithm': 'TWAP', |
| 'total_qty': executed_qty, |
| 'total_cost': total_cost, |
| 'avg_price': vwap, |
| 'avg_impact_bps': avg_impact, |
| 'max_impact_bps': max_impact, |
| 'slippage_bps': avg_impact, |
| 'n_child_orders': len(schedule) |
| } |
|
|
|
|
| class VWAPScheduler: |
| """ |
| Volume-Weighted Average Price execution. |
| |
| Slices parent order proportionally to historical volume profile. |
| Executes more in high-volume periods (typically open, close, mid-day lull). |
| |
| When to use: When you want to match the market VWAP. |
| Institutional benchmark: Did my execution VWAP match the market VWAP? |
| |
| Formula: Child qty_i = Total qty * (Volume_i / Total_Volume) |
| """ |
| |
| def __init__(self, |
| n_buckets: int = 20, |
| default_profile: Optional[Dict[int, float]] = None): |
| self.n_buckets = n_buckets |
| |
| |
| if default_profile is None: |
| |
| self.default_profile = { |
| 9: 0.08, |
| 10: 0.06, |
| 11: 0.05, |
| 12: 0.04, |
| 13: 0.04, |
| 14: 0.05, |
| 15: 0.07, |
| 16: 0.10, |
| } |
| else: |
| self.default_profile = default_profile |
| |
| def estimate_volume_profile(self, |
| trade_data: pd.DataFrame, |
| bucket_size: str = '30min') -> pd.Series: |
| """ |
| Estimate intraday volume profile from historical trade data. |
| |
| trade_data columns: timestamp, volume |
| """ |
| trade_data = trade_data.copy() |
| trade_data['time'] = pd.to_datetime(trade_data.index).time |
| |
| |
| profile = trade_data.resample(bucket_size)['volume'].mean() |
| |
| |
| profile = profile / profile.sum() |
| |
| return profile |
| |
| def schedule(self, order: Order, |
| start_time: pd.Timestamp, |
| end_time: Optional[pd.Timestamp] = None, |
| volume_profile: Optional[pd.Series] = None) -> pd.DataFrame: |
| """Create VWAP execution schedule""" |
| if end_time is None: |
| end_time = start_time + pd.Timedelta(hours=6) |
| |
| |
| n_buckets = self.n_buckets |
| buckets = pd.date_range(start=start_time, end=end_time, periods=n_buckets + 1) |
| |
| |
| if volume_profile is not None: |
| |
| fractions = [] |
| for i in range(n_buckets): |
| bucket_start = buckets[i] |
| hour = bucket_start.hour |
| frac = volume_profile.get(hour, 1.0 / n_buckets) |
| fractions.append(frac) |
| |
| |
| fractions = np.array(fractions) |
| fractions = fractions / fractions.sum() |
| else: |
| fractions = np.ones(n_buckets) / n_buckets |
| |
| |
| quantities = (fractions * order.quantity).astype(int) |
| |
| |
| remainder = order.quantity - quantities.sum() |
| quantities[0] += remainder |
| |
| schedule = pd.DataFrame({ |
| 'bucket_start': buckets[:-1], |
| 'bucket_end': buckets[1:], |
| 'target_qty': quantities, |
| 'fraction': fractions, |
| 'algorithm': 'VWAP', |
| 'symbol': order.symbol, |
| 'side': order.side |
| }) |
| |
| return schedule |
| |
| def execute(self, schedule: pd.DataFrame, |
| market_prices: pd.Series, |
| market_volumes: pd.Series, |
| impact_model: Optional[MarketImpactModel] = None) -> Dict: |
| """Simulate VWAP execution""" |
| if impact_model is None: |
| impact_model = MarketImpactModel() |
| |
| executed_qty = 0 |
| total_cost = 0 |
| prices = [] |
| impacts = [] |
| |
| for _, row in schedule.iterrows(): |
| qty = row['target_qty'] |
| if qty <= 0: |
| continue |
| |
| mask = market_prices.index >= row['bucket_start'] |
| if mask.any(): |
| price = market_prices[mask].iloc[0] |
| vol = market_volumes[mask].iloc[0] if len(market_volumes[mask]) > 0 else 1000000 |
| else: |
| price = market_prices.iloc[-1] |
| vol = 1000000 |
| |
| |
| impact_bps = impact_model.temporary_impact(qty, vol, 0.02) |
| impact_price = price * (1 + impact_bps / 10000) |
| |
| cost = qty * impact_price |
| total_cost += cost |
| executed_qty += qty |
| |
| prices.append(price) |
| impacts.append(impact_bps) |
| |
| vwap = total_cost / executed_qty if executed_qty > 0 else 0 |
| |
| |
| market_vwap = (market_prices * market_volumes).sum() / market_volumes.sum() |
| |
| return { |
| 'algorithm': 'VWAP', |
| 'total_qty': executed_qty, |
| 'total_cost': total_cost, |
| 'avg_price': vwap, |
| 'market_vwap': market_vwap, |
| 'vwap_deviation_bps': abs(vwap - market_vwap) / market_vwap * 10000 if market_vwap > 0 else 0, |
| 'avg_impact_bps': np.mean(impacts) if impacts else 0, |
| 'n_child_orders': len(schedule) |
| } |
|
|
|
|
| class SmartOrderRouter: |
| """ |
| Smart Order Routing: Select optimal venue/algorithm based on order characteristics. |
| |
| Decision tree: |
| - Small orders (< 1% ADV): Market/limit, single venue |
| - Medium orders (1-10% ADV): TWAP over 1-2 hours |
| - Large orders (> 10% ADV): VWAP over full day, possibly dark pools |
| - Urgent: Market order, accept impact |
| - Patient: TWAP/VWAP, minimize impact |
| """ |
| |
| def __init__(self, impact_model: Optional[MarketImpactModel] = None): |
| self.impact_model = impact_model or MarketImpactModel() |
| self.twap = TWAPScheduler() |
| self.vwap = VWAPScheduler() |
| |
| def route_order(self, order: Order, |
| avg_daily_volume: int, |
| urgency: str = 'normal', |
| volatility: float = 0.02) -> Dict: |
| """ |
| Route order to optimal execution strategy. |
| |
| Args: |
| order: Order specification |
| avg_daily_volume: Average daily volume of the symbol |
| urgency: 'urgent', 'normal', 'patient' |
| volatility: Daily volatility |
| |
| Returns: |
| Dict with routing decision and execution schedule |
| """ |
| participation = order.quantity / max(avg_daily_volume, 1) |
| |
| |
| if urgency == 'urgent' or participation < 0.01: |
| |
| strategy = 'market' |
| expected_impact = self.impact_model.temporary_impact( |
| order.quantity, avg_daily_volume, volatility |
| ) |
| schedule = pd.DataFrame({ |
| 'bucket_start': [pd.Timestamp.now()], |
| 'bucket_end': [pd.Timestamp.now()], |
| 'target_qty': [order.quantity], |
| 'fraction': [1.0], |
| 'algorithm': 'MARKET', |
| 'symbol': [order.symbol], |
| 'side': [order.side] |
| }) |
| |
| elif participation < 0.05: |
| |
| strategy = 'twap' |
| schedule = self.twap.schedule( |
| order, pd.Timestamp.now(), |
| end_time=pd.Timestamp.now() + pd.Timedelta(hours=2) |
| ) |
| expected_impact = self.impact_model.temporary_impact( |
| order.quantity // len(schedule), avg_daily_volume, volatility |
| ) |
| |
| else: |
| |
| strategy = 'vwap' |
| schedule = self.vwap.schedule( |
| order, pd.Timestamp.now(), |
| end_time=pd.Timestamp.now() + pd.Timedelta(hours=6) |
| ) |
| expected_impact = self.impact_model.temporary_impact( |
| order.quantity // len(schedule), avg_daily_volume, volatility |
| ) |
| |
| return { |
| 'order': order, |
| 'strategy': strategy, |
| 'participation_rate': participation, |
| 'expected_impact_bps': expected_impact, |
| 'schedule': schedule, |
| 'urgency': urgency |
| } |
|
|
|
|
| def benchmark_execution_algorithms(): |
| """Compare TWAP vs VWAP vs Market order on synthetic data""" |
| np.random.seed(42) |
| |
| |
| n_minutes = 390 |
| times = pd.date_range('2024-01-01 09:30', periods=n_minutes, freq='1min') |
| |
| |
| price = 100.0 |
| prices = [price] |
| for _ in range(n_minutes - 1): |
| price *= (1 + np.random.randn() * 0.001) |
| prices.append(price) |
| |
| |
| base_vol = 1000 |
| hours = np.arange(n_minutes) / 60 |
| vol_pattern = 0.5 + 2.0 * np.exp(-((hours - 0.5) ** 2) / 0.1) + \ |
| 0.5 * np.sin(hours * np.pi) |
| volumes = (base_vol * vol_pattern * (1 + np.random.randn(n_minutes) * 0.2)).astype(int) |
| volumes = np.maximum(volumes, 100) |
| |
| price_series = pd.Series(prices, index=times) |
| volume_series = pd.Series(volumes, index=times) |
| |
| |
| order = Order(symbol='AAPL', side='buy', quantity=50000, order_type='twap') |
| |
| |
| twap = TWAPScheduler(n_buckets=20) |
| twap_schedule = twap.schedule(order, times[0]) |
| twap_result = twap.execute(twap_schedule, price_series, |
| daily_volume=volumes.sum(), volatility=0.02) |
| |
| |
| vwap = VWAPScheduler(n_buckets=20) |
| vwap_schedule = vwap.schedule(order, times[0], |
| volume_profile=None) |
| vwap_result = vwap.execute(vwap_schedule, price_series, volume_series) |
| |
| |
| market_impact = MarketImpactModel() |
| market_price = price_series.iloc[0] |
| impact = market_impact.temporary_impact(50000, volumes.sum(), 0.02) |
| market_cost = 50000 * market_price * (1 + impact / 10000) |
| |
| print("=" * 60) |
| print("EXECUTION ALGORITHM BENCHMARK") |
| print("=" * 60) |
| print(f"\nOrder: Buy 50,000 AAPL shares") |
| print(f"ADV: {volumes.sum():,} | Participation: {50000/volumes.sum()*100:.1f}%") |
| print() |
| print(f"MARKET ORDER:") |
| print(f" Cost: ${market_cost:,.2f} | Impact: {impact:.1f} bps | Slippage: {impact:.1f} bps") |
| print() |
| print(f"TWAP:") |
| print(f" Cost: ${twap_result['total_cost']:,.2f} | Impact: {twap_result['avg_impact_bps']:.1f} bps") |
| print(f" Avg Price: ${twap_result['avg_price']:.2f} | Child Orders: {twap_result['n_child_orders']}") |
| print() |
| print(f"VWAP:") |
| print(f" Cost: ${vwap_result['total_cost']:,.2f} | Impact: {vwap_result['avg_impact_bps']:.1f} bps") |
| print(f" Avg Price: ${vwap_result['avg_price']:.2f}") |
| print(f" Market VWAP: ${vwap_result['market_vwap']:.2f} | Deviation: {vwap_result['vwap_deviation_bps']:.1f} bps") |
| print() |
| |
| savings_twap = (market_cost - twap_result['total_cost']) / market_cost * 100 |
| savings_vwap = (market_cost - vwap_result['total_cost']) / market_cost * 100 |
| print(f"Savings vs Market Order:") |
| print(f" TWAP: {savings_twap:.2f}% | VWAP: {savings_vwap:.2f}%") |
|
|
|
|
| if __name__ == '__main__': |
| benchmark_execution_algorithms() |
|
|