alphaforge-quant-system / execution_algorithms.py
Premchan369's picture
Add execution algorithms: TWAP, VWAP, Smart Order Router with market impact model
b6c23e5 verified
"""Execution Algorithms: TWAP, VWAP, Smart Order Routing
What separates retail execution from institutional execution:
- Retail: Market orders, immediate execution, pay spread
- Institutional: TWAP/VWAP, slice orders across time, minimize market impact
Market impact model: Price moves against you proportional to order size / daily volume
"""
import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
import warnings
warnings.filterwarnings('ignore')
@dataclass
class Order:
"""Single order specification"""
symbol: str
side: str # 'buy' or 'sell'
quantity: int
order_type: str # 'market', 'limit', 'twap', 'vwap'
limit_price: Optional[float] = None
def __post_init__(self):
self.side = self.side.lower()
self.order_type = self.order_type.lower()
class MarketImpactModel:
"""
Square-root market impact model (Almgren-Chriss, 1999).
Market impact = σ * sqrt(Q / V)
Where:
- σ = daily volatility
- Q = order quantity
- V = daily volume
Temporary impact: decays within minutes
Permanent impact: persists
"""
def __init__(self,
temp_impact_coef: float = 0.5,
perm_impact_coef: float = 0.1,
decay_halflife: int = 10):
self.temp_impact_coef = temp_impact_coef
self.perm_impact_coef = perm_impact_coef
self.decay_halflife = decay_halflife
def temporary_impact(self, order_size: int, daily_volume: int,
volatility: float) -> float:
"""Temporary price impact (bps)"""
participation = order_size / max(daily_volume, 1)
return self.temp_impact_coef * volatility * np.sqrt(participation)
def permanent_impact(self, order_size: int, daily_volume: int,
volatility: float) -> float:
"""Permanent price impact (bps)"""
participation = order_size / max(daily_volume, 1)
return self.perm_impact_coef * volatility * participation
class TWAPScheduler:
"""
Time-Weighted Average Price execution.
Slices parent order into N child orders, equally distributed in time.
When to use: When you want to minimize timing risk and have no view
on intraday price direction. Simple, predictable, low market impact.
Formula: Child qty = Total qty / N buckets
"""
def __init__(self,
n_buckets: int = 20,
bucket_duration_minutes: int = 15):
self.n_buckets = n_buckets
self.bucket_duration = bucket_duration_minutes
def schedule(self, order: Order,
start_time: pd.Timestamp,
end_time: Optional[pd.Timestamp] = None) -> pd.DataFrame:
"""
Create TWAP execution schedule.
Returns DataFrame with bucket_start, bucket_end, target_qty
"""
if end_time is None:
end_time = start_time + pd.Timedelta(
minutes=self.n_buckets * self.bucket_duration
)
# Time buckets
buckets = pd.date_range(
start=start_time,
end=end_time,
periods=self.n_buckets + 1
)
# Equal quantity per bucket
qty_per_bucket = order.quantity // self.n_buckets
remainder = order.quantity % self.n_buckets
quantities = [qty_per_bucket] * self.n_buckets
# Add remainder to first buckets
for i in range(remainder):
quantities[i] += 1
schedule = pd.DataFrame({
'bucket_start': buckets[:-1],
'bucket_end': buckets[1:],
'target_qty': quantities,
'fraction': 1.0 / self.n_buckets,
'algorithm': 'TWAP',
'symbol': order.symbol,
'side': order.side
})
return schedule
def execute(self, schedule: pd.DataFrame,
market_prices: pd.Series,
impact_model: Optional[MarketImpactModel] = None,
daily_volume: int = 1000000,
volatility: float = 0.02) -> Dict:
"""
Simulate TWAP execution with market impact.
Returns execution statistics.
"""
if impact_model is None:
impact_model = MarketImpactModel()
executed_qty = 0
total_cost = 0
prices = []
impacts = []
for _, row in schedule.iterrows():
qty = row['target_qty']
# Get price at bucket start (approximation)
mask = market_prices.index >= row['bucket_start']
if mask.any():
price = market_prices[mask].iloc[0]
else:
price = market_prices.iloc[-1]
# Market impact
impact_bps = impact_model.temporary_impact(
qty, daily_volume, volatility
)
impact_price = price * (1 + impact_bps / 10000)
# Cost
cost = qty * impact_price
total_cost += cost
executed_qty += qty
prices.append(price)
impacts.append(impact_bps)
# VWAP benchmark
vwap = total_cost / executed_qty if executed_qty > 0 else 0
# Metrics
avg_impact = np.mean(impacts)
max_impact = np.max(impacts)
return {
'algorithm': 'TWAP',
'total_qty': executed_qty,
'total_cost': total_cost,
'avg_price': vwap,
'avg_impact_bps': avg_impact,
'max_impact_bps': max_impact,
'slippage_bps': avg_impact,
'n_child_orders': len(schedule)
}
class VWAPScheduler:
"""
Volume-Weighted Average Price execution.
Slices parent order proportionally to historical volume profile.
Executes more in high-volume periods (typically open, close, mid-day lull).
When to use: When you want to match the market VWAP.
Institutional benchmark: Did my execution VWAP match the market VWAP?
Formula: Child qty_i = Total qty * (Volume_i / Total_Volume)
"""
def __init__(self,
n_buckets: int = 20,
default_profile: Optional[Dict[int, float]] = None):
self.n_buckets = n_buckets
# Default intraday volume profile (U-shape: high at open/close)
if default_profile is None:
# Hour of day -> volume fraction (simplified)
self.default_profile = {
9: 0.08, # 9-10 AM: High
10: 0.06,
11: 0.05,
12: 0.04, # Mid-day lull
13: 0.04,
14: 0.05,
15: 0.07,
16: 0.10, # 3-4 PM: High (close)
}
else:
self.default_profile = default_profile
def estimate_volume_profile(self,
trade_data: pd.DataFrame,
bucket_size: str = '30min') -> pd.Series:
"""
Estimate intraday volume profile from historical trade data.
trade_data columns: timestamp, volume
"""
trade_data = trade_data.copy()
trade_data['time'] = pd.to_datetime(trade_data.index).time
# Resample
profile = trade_data.resample(bucket_size)['volume'].mean()
# Normalize to fractions
profile = profile / profile.sum()
return profile
def schedule(self, order: Order,
start_time: pd.Timestamp,
end_time: Optional[pd.Timestamp] = None,
volume_profile: Optional[pd.Series] = None) -> pd.DataFrame:
"""Create VWAP execution schedule"""
if end_time is None:
end_time = start_time + pd.Timedelta(hours=6)
# Generate time buckets
n_buckets = self.n_buckets
buckets = pd.date_range(start=start_time, end=end_time, periods=n_buckets + 1)
# Get volume fractions for each bucket
if volume_profile is not None:
# Map buckets to volume profile
fractions = []
for i in range(n_buckets):
bucket_start = buckets[i]
hour = bucket_start.hour
frac = volume_profile.get(hour, 1.0 / n_buckets)
fractions.append(frac)
# Normalize
fractions = np.array(fractions)
fractions = fractions / fractions.sum()
else:
fractions = np.ones(n_buckets) / n_buckets
# Allocate quantities
quantities = (fractions * order.quantity).astype(int)
# Handle rounding
remainder = order.quantity - quantities.sum()
quantities[0] += remainder
schedule = pd.DataFrame({
'bucket_start': buckets[:-1],
'bucket_end': buckets[1:],
'target_qty': quantities,
'fraction': fractions,
'algorithm': 'VWAP',
'symbol': order.symbol,
'side': order.side
})
return schedule
def execute(self, schedule: pd.DataFrame,
market_prices: pd.Series,
market_volumes: pd.Series,
impact_model: Optional[MarketImpactModel] = None) -> Dict:
"""Simulate VWAP execution"""
if impact_model is None:
impact_model = MarketImpactModel()
executed_qty = 0
total_cost = 0
prices = []
impacts = []
for _, row in schedule.iterrows():
qty = row['target_qty']
if qty <= 0:
continue
mask = market_prices.index >= row['bucket_start']
if mask.any():
price = market_prices[mask].iloc[0]
vol = market_volumes[mask].iloc[0] if len(market_volumes[mask]) > 0 else 1000000
else:
price = market_prices.iloc[-1]
vol = 1000000
# Impact proportional to participation
impact_bps = impact_model.temporary_impact(qty, vol, 0.02)
impact_price = price * (1 + impact_bps / 10000)
cost = qty * impact_price
total_cost += cost
executed_qty += qty
prices.append(price)
impacts.append(impact_bps)
vwap = total_cost / executed_qty if executed_qty > 0 else 0
# Market VWAP (what we tried to match)
market_vwap = (market_prices * market_volumes).sum() / market_volumes.sum()
return {
'algorithm': 'VWAP',
'total_qty': executed_qty,
'total_cost': total_cost,
'avg_price': vwap,
'market_vwap': market_vwap,
'vwap_deviation_bps': abs(vwap - market_vwap) / market_vwap * 10000 if market_vwap > 0 else 0,
'avg_impact_bps': np.mean(impacts) if impacts else 0,
'n_child_orders': len(schedule)
}
class SmartOrderRouter:
"""
Smart Order Routing: Select optimal venue/algorithm based on order characteristics.
Decision tree:
- Small orders (< 1% ADV): Market/limit, single venue
- Medium orders (1-10% ADV): TWAP over 1-2 hours
- Large orders (> 10% ADV): VWAP over full day, possibly dark pools
- Urgent: Market order, accept impact
- Patient: TWAP/VWAP, minimize impact
"""
def __init__(self, impact_model: Optional[MarketImpactModel] = None):
self.impact_model = impact_model or MarketImpactModel()
self.twap = TWAPScheduler()
self.vwap = VWAPScheduler()
def route_order(self, order: Order,
avg_daily_volume: int,
urgency: str = 'normal',
volatility: float = 0.02) -> Dict:
"""
Route order to optimal execution strategy.
Args:
order: Order specification
avg_daily_volume: Average daily volume of the symbol
urgency: 'urgent', 'normal', 'patient'
volatility: Daily volatility
Returns:
Dict with routing decision and execution schedule
"""
participation = order.quantity / max(avg_daily_volume, 1)
# Decision logic
if urgency == 'urgent' or participation < 0.01:
# Small or urgent: Single market/limit order
strategy = 'market'
expected_impact = self.impact_model.temporary_impact(
order.quantity, avg_daily_volume, volatility
)
schedule = pd.DataFrame({
'bucket_start': [pd.Timestamp.now()],
'bucket_end': [pd.Timestamp.now()],
'target_qty': [order.quantity],
'fraction': [1.0],
'algorithm': 'MARKET',
'symbol': [order.symbol],
'side': [order.side]
})
elif participation < 0.05:
# Medium: TWAP over 2 hours
strategy = 'twap'
schedule = self.twap.schedule(
order, pd.Timestamp.now(),
end_time=pd.Timestamp.now() + pd.Timedelta(hours=2)
)
expected_impact = self.impact_model.temporary_impact(
order.quantity // len(schedule), avg_daily_volume, volatility
)
else:
# Large: VWAP over full day
strategy = 'vwap'
schedule = self.vwap.schedule(
order, pd.Timestamp.now(),
end_time=pd.Timestamp.now() + pd.Timedelta(hours=6)
)
expected_impact = self.impact_model.temporary_impact(
order.quantity // len(schedule), avg_daily_volume, volatility
)
return {
'order': order,
'strategy': strategy,
'participation_rate': participation,
'expected_impact_bps': expected_impact,
'schedule': schedule,
'urgency': urgency
}
def benchmark_execution_algorithms():
"""Compare TWAP vs VWAP vs Market order on synthetic data"""
np.random.seed(42)
# Generate synthetic intraday data
n_minutes = 390 # Trading day minutes (9:30 - 16:00)
times = pd.date_range('2024-01-01 09:30', periods=n_minutes, freq='1min')
# Price: random walk with slight drift
price = 100.0
prices = [price]
for _ in range(n_minutes - 1):
price *= (1 + np.random.randn() * 0.001)
prices.append(price)
# Volume: U-shaped intraday pattern
base_vol = 1000
hours = np.arange(n_minutes) / 60
vol_pattern = 0.5 + 2.0 * np.exp(-((hours - 0.5) ** 2) / 0.1) + \
0.5 * np.sin(hours * np.pi)
volumes = (base_vol * vol_pattern * (1 + np.random.randn(n_minutes) * 0.2)).astype(int)
volumes = np.maximum(volumes, 100)
price_series = pd.Series(prices, index=times)
volume_series = pd.Series(volumes, index=times)
# Create order
order = Order(symbol='AAPL', side='buy', quantity=50000, order_type='twap')
# TWAP
twap = TWAPScheduler(n_buckets=20)
twap_schedule = twap.schedule(order, times[0])
twap_result = twap.execute(twap_schedule, price_series,
daily_volume=volumes.sum(), volatility=0.02)
# VWAP
vwap = VWAPScheduler(n_buckets=20)
vwap_schedule = vwap.schedule(order, times[0],
volume_profile=None)
vwap_result = vwap.execute(vwap_schedule, price_series, volume_series)
# Market (single order)
market_impact = MarketImpactModel()
market_price = price_series.iloc[0]
impact = market_impact.temporary_impact(50000, volumes.sum(), 0.02)
market_cost = 50000 * market_price * (1 + impact / 10000)
print("=" * 60)
print("EXECUTION ALGORITHM BENCHMARK")
print("=" * 60)
print(f"\nOrder: Buy 50,000 AAPL shares")
print(f"ADV: {volumes.sum():,} | Participation: {50000/volumes.sum()*100:.1f}%")
print()
print(f"MARKET ORDER:")
print(f" Cost: ${market_cost:,.2f} | Impact: {impact:.1f} bps | Slippage: {impact:.1f} bps")
print()
print(f"TWAP:")
print(f" Cost: ${twap_result['total_cost']:,.2f} | Impact: {twap_result['avg_impact_bps']:.1f} bps")
print(f" Avg Price: ${twap_result['avg_price']:.2f} | Child Orders: {twap_result['n_child_orders']}")
print()
print(f"VWAP:")
print(f" Cost: ${vwap_result['total_cost']:,.2f} | Impact: {vwap_result['avg_impact_bps']:.1f} bps")
print(f" Avg Price: ${vwap_result['avg_price']:.2f}")
print(f" Market VWAP: ${vwap_result['market_vwap']:.2f} | Deviation: {vwap_result['vwap_deviation_bps']:.1f} bps")
print()
savings_twap = (market_cost - twap_result['total_cost']) / market_cost * 100
savings_vwap = (market_cost - vwap_result['total_cost']) / market_cost * 100
print(f"Savings vs Market Order:")
print(f" TWAP: {savings_twap:.2f}% | VWAP: {savings_vwap:.2f}%")
if __name__ == '__main__':
benchmark_execution_algorithms()