QuantFlux Alpha (Test Model for 3.0): Technical Architecture & Implementation Details
Table of Contents
- System Overview
- Dollar Bar Aggregation
- Feature Engineering Pipeline
- Model Training & Optimization
- Signal Generation Logic
- Risk Management Framework
- Data Processing Pipeline
- Deployment Architecture
System Overview
End-to-End Flow
Binance WebSocket (Real-time ticks)
↓
Tick Validation & Cleaning
↓
Dollar Bar Aggregator ($500k threshold)
↓
Feature Computation Engine
↓
XGBoost Model Inference
↓
Risk Management Checks (6 layers)
↓
Order Execution & P&L Tracking
Key Components
1. Data Ingestion Layer
- Binance perpetual futures WebSocket (BTC/USDT)
- Tick-level data: price, quantity, side, timestamp
- Validation: Remove duplicates, out-of-order ticks, invalid quotes
2. Dollar Bar Generator
- Aggregates ticks by dollar volume ($500,000 per bar)
- Eliminates look-ahead bias (timestamps at bar completion)
- Reduces autocorrelation by 10-20% vs time bars
- Output: OHLCV candles with derived metrics (VWAP, ATR, RSI)
3. Feature Engineering
- 17 core features from dollar bars
- Computation: 50-bar rolling windows (no look-ahead)
- Normalization: StandardScaler (mean=0, std=1)
- Output: Feature vectors for model inference
4. Model Inference
- XGBoost classifier (2,000 trees, depth=7)
- Input: 17-dimensional feature vector
- Output: Binary prediction (0/1) + confidence probability
- Latency: <50ms single prediction (target <100ms batch)
5. Risk Management
- 6 independent enforcement layers
- Pre-trade checks (position limits, volatility filters)
- In-trade monitoring (stop-loss, take-profit)
- Post-trade validation (daily/weekly loss limits)
6. Execution Engine
- Paper trading (simulated fills)
- Order management (entry, stop-loss, take-profit)
- P&L calculation (slippage-aware)
- Logging and audit trails
Dollar Bar Aggregation
Motivation
Traditional time-based bars (1H, 4H) introduce biases:
- Unequal information content (busy hours vs quiet hours)
- Look-ahead bias (closing price depends on future ticks)
- Autocorrelation (nearby bars correlated)
Dollar bars fix this: Sample by volume, not time.
Algorithm
def create_dollar_bar(ticks, dollar_threshold=500_000):
"""
Aggregate ticks into bars by cumulative dollar volume.
Args:
ticks: List of (price, quantity, timestamp)
dollar_threshold: Dollar volume per bar (e.g., $500k)
Returns:
bars: List of (open, high, low, close, volume, vwap, timestamp)
"""
bars = []
dollar_volume = 0
bar_open = None
bar_high = -inf
bar_low = inf
bar_volume = 0
vwap_num = 0 # Numerator: sum(price * volume)
vwap_den = 0 # Denominator: sum(volume)
bar_start_time = None
for price, quantity, timestamp in ticks:
dollar_value = price * quantity
# Initialize bar on first tick
if bar_open is None:
bar_open = price
bar_start_time = timestamp
# Update bar statistics
bar_high = max(bar_high, price)
bar_low = min(bar_low, price)
bar_volume += quantity
dollar_volume += dollar_value
vwap_num += price * quantity
vwap_den += quantity
# Check if bar complete
if dollar_volume >= dollar_threshold:
vwap = vwap_num / vwap_den
bar_close = price
bars.append({
'timestamp': timestamp, # Last tick time (no look-ahead)
'open': bar_open,
'high': bar_high,
'low': bar_low,
'close': bar_close,
'volume': bar_volume,
'vwap': vwap,
'duration_seconds': timestamp - bar_start_time
})
# Reset for next bar
dollar_volume = 0
bar_open = None
bar_high = -inf
bar_low = inf
bar_volume = 0
vwap_num = 0
vwap_den = 0
return bars
Advantages
- Look-Ahead Prevention: Timestamped at last tick, no future data used
- Information Normalization: Each bar represents ~equal market participation
- Autocorrelation Reduction: High-volume periods produce more bars, spreading correlation
- Microstructure Preservation: Captures real-time liquidity patterns
Threshold Selection
- $500k per bar: Balances granularity vs bar frequency
- At BTC $50k: ~10 contracts per bar
- Produces ~50-100 bars/day in active periods
- Covers market microstructure (0.01s - 10s windows)
- Alternative thresholds:
- $100k: Very granular, ~200-300 bars/day (high noise)
- $1M: Coarse, ~20-30 bars/day (low signal resolution)
Feature Engineering Pipeline
Feature Categories
1. Price Action (5 features)
ret_1: Lag-1 Return
def ret_1(bars):
"""Single bar return (momentum)."""
close = bars['close'].shift(1) # No look-ahead
return close.pct_change()
- Captures immediate momentum
- Used for mean-reversion identification
- Importance: 4.93%
ret_3: 3-Bar Return
def ret_3(bars):
"""3-bar cumulative return."""
close = bars['close'].shift(1)
return (close / close.shift(2) - 1)
- Medium-term trend confirmation
- Smooths out single-bar noise
- Importance: 4.95%
ret_5: 5-Bar Return
def ret_5(bars):
"""5-bar cumulative return."""
close = bars['close'].shift(1)
return (close / close.shift(4) - 1)
- Regime identification (bull vs bear)
- Filters out short-term noise
- Importance: 4.96%
ret_accel: Return Acceleration
def ret_accel(bars):
"""Change in momentum (second derivative)."""
close = bars['close'].shift(1)
ret = close.pct_change()
return ret.diff()
- Detects momentum reversals
- Peaks before trend changes
- Importance: 4.99%
close_pos: Close Position
def close_pos(bars):
"""Position of close within 20-bar range."""
close = bars['close'].shift(1)
high_20 = bars['high'].shift(1).rolling(20).max()
low_20 = bars['low'].shift(1).rolling(20).min()
return (close - low_20) / (high_20 - low_20)
- 0.0: At 20-bar low (oversold)
- 1.0: At 20-bar high (overbought)
- 0.5: Neutral (mean reversion opportunity)
- Importance: 4.82%
2. Volume Features (3 features)
vol_20: 20-Bar Volume Mean
def vol_20(bars):
"""Expected volume baseline."""
return bars['volume'].shift(1).rolling(20).mean()
- Baseline for volume anomalies
- Normalized by market regime
- Importance: 5.08%
high_vol: Volume Spike
def high_vol(bars):
"""Binary: volume above 1.5x average."""
volume = bars['volume'].shift(1)
vol_mean = volume.rolling(20).mean()
return (volume > vol_mean * 1.5).astype(int)
- Confidence flag for trades
- Indicates institutional activity
- Importance: 4.74%
low_vol: Volume Drought
def low_vol(bars):
"""Binary: volume below 0.7x average."""
volume = bars['volume'].shift(1)
vol_mean = volume.rolling(20).mean()
return (volume < vol_mean * 0.7).astype(int)
- Risk flag (thin liquidity)
- May precede price gaps
- Importance: 4.80%
3. Volatility Features (2 features)
rsi_oversold: RSI < 30
def rsi_oversold(bars):
"""Relative Strength Index oversold condition."""
close = bars['close'].shift(1)
delta = close.diff()
gain = (delta.where(delta > 0, 0)).rolling(14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return (rsi < 30).astype(int)
- Oversold = bounce opportunity
- 14-period standard RSI calculation
- Importance: 5.07%
rsi_neutral: 30 <= RSI <= 70
def rsi_neutral(bars):
"""RSI in neutral zone (no extreme conditions)."""
close = bars['close'].shift(1)
# [RSI calculation as above]
return ((rsi >= 30) & (rsi <= 70)).astype(int)
- Normal volatility regime
- Avoid extreme conditions
- Importance: 5.14% (highest!)
4. MACD Features (1 feature)
macd_positive: MACD > 0
def macd_positive(bars):
"""Moving Average Convergence Divergence signal."""
close = bars['close'].shift(1)
ema12 = close.ewm(span=12, adjust=False).mean()
ema26 = close.ewm(span=26, adjust=False).mean()
macd = ema12 - ema26
return (macd > 0).astype(int)
- Bullish trend confirmation
- Cross above zero = potential buy
- Importance: 4.77%
5. Time-of-Day Features (4 features)
london_open: London Session Open
def london_open(bars):
"""Binary: 8:00 UTC ±30 minutes."""
hour = bars['timestamp'].dt.hour
minute = bars['timestamp'].dt.minute
is_london = (hour == 8) & (minute >= 0) & (minute < 30)
return is_london.astype(int)
- Highest daily volatility period
- Institutional orders flow
- Importance: 5.08%
london_close: London Session Close
def london_close(bars):
"""Binary: 16:30 UTC ±30 minutes."""
hour = bars['timestamp'].dt.hour
minute = bars['timestamp'].dt.minute
is_close = (hour == 16) & (minute >= 30) & (minute < 60)
return is_close.astype(int)
- Position unwinding
- End-of-session volatility
- Importance: 4.70%
nyse_open: NYSE Open
def nyse_open(bars):
"""Binary: 13:30 UTC ±30 minutes."""
hour = bars['timestamp'].dt.hour
minute = bars['timestamp'].dt.minute
is_open = (hour == 13) & (minute >= 30) & (minute < 60)
return is_open.astype(int)
- Equities-crypto correlation spike
- Derivative hedging flows
- Importance: 5.02%
hour: Hour of Day
def hour(bars):
"""Hour of day (0-23 UTC)."""
return bars['timestamp'].dt.hour
- Captures intraday seasonality
- 24-hour crypto cycles
- Importance: 4.91%
6. VWAP Features (1 feature, embedded in signals)
vwap_deviation: Percent Deviation
def vwap_deviation(bars):
"""Percentage deviation from VWAP."""
close = bars['close'].shift(1)
vwap_20 = (bars['vwap'].shift(1).rolling(20).mean())
return ((close - vwap_20) / vwap_20 * 100)
- Price relative to fair value
- Negative = oversold opportunity
- Importance: Embedded in entry signals
7. ATR Features (1 feature, for stops)
atr_stops: Dynamic Stop Sizing
def atr_stops(bars, period=14):
"""Average True Range for stop-loss sizing."""
high = bars['high'].shift(1)
low = bars['low'].shift(1)
close = bars['close'].shift(1)
tr1 = high - low
tr2 = (high - close.shift(1)).abs()
tr3 = (low - close.shift(1)).abs()
tr = max(tr1, tr2, tr3)
atr = tr.rolling(period).mean()
return atr * 1.0 # 1.0x multiplier
- Dynamic risk/reward sizing
- Scales with market volatility
- Important: Controls position exposure
Feature Computation Code Example
import pandas as pd
from sklearn.preprocessing import StandardScaler
def compute_features(bars_df):
"""
Compute all 17 features for dollar bars.
Args:
bars_df: DataFrame with OHLCV columns
Returns:
features_scaled: (N, 17) feature matrix, normalized
scaler: Fitted StandardScaler for production use
"""
df = bars_df.copy()
# Price features
df['ret_1'] = df['close'].shift(1).pct_change()
df['ret_3'] = (df['close'].shift(1) / df['close'].shift(3) - 1)
df['ret_5'] = (df['close'].shift(1) / df['close'].shift(5) - 1)
df['ret_accel'] = df['ret_1'].diff()
high_20 = df['high'].shift(1).rolling(20).max()
low_20 = df['low'].shift(1).rolling(20).min()
df['close_pos'] = (df['close'].shift(1) - low_20) / (high_20 - low_20)
# Volume features
df['vol_20'] = df['volume'].shift(1).rolling(20).mean()
df['high_vol'] = (df['volume'].shift(1) > df['vol_20'] * 1.5).astype(int)
df['low_vol'] = (df['volume'].shift(1) < df['vol_20'] * 0.7).astype(int)
# Volatility features (RSI)
close = df['close'].shift(1)
delta = close.diff()
gain = delta.where(delta > 0, 0).rolling(14).mean()
loss = -delta.where(delta < 0, 0).rolling(14).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
df['rsi_oversold'] = (rsi < 30).astype(int)
df['rsi_neutral'] = ((rsi >= 30) & (rsi <= 70)).astype(int)
# MACD
ema12 = close.ewm(span=12, adjust=False).mean()
ema26 = close.ewm(span=26, adjust=False).mean()
macd = ema12 - ema26
df['macd_positive'] = (macd > 0).astype(int)
# Time-of-day
df['london_open'] = ((df.index.hour == 8) & (df.index.minute < 30)).astype(int)
df['london_close'] = ((df.index.hour == 16) & (df.index.minute >= 30)).astype(int)
df['nyse_open'] = ((df.index.hour == 13) & (df.index.minute >= 30)).astype(int)
df['hour'] = df.index.hour
# VWAP deviation (embedded in signals)
df['vwap_deviation'] = ((df['close'].shift(1) - df['vwap']) / df['vwap'] * 100)
# ATR
high = df['high'].shift(1)
low = df['low'].shift(1)
close_lag = df['close'].shift(2)
tr1 = high - low
tr2 = (high - close_lag).abs()
tr3 = (low - close_lag).abs()
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
df['atr_stops'] = tr.rolling(14).mean() * 1.0
# Select feature columns (order critical!)
feature_cols = [
'ret_1', 'ret_3', 'ret_5', 'ret_accel', 'close_pos',
'vol_20', 'high_vol', 'low_vol',
'rsi_oversold', 'rsi_neutral', 'macd_positive',
'london_open', 'london_close', 'nyse_open', 'hour',
'vwap_deviation', 'atr_stops'
]
features = df[feature_cols].dropna()
# Normalize
scaler = StandardScaler()
features_scaled = scaler.fit_transform(features)
return features_scaled, scaler, features.index
Look-Ahead Bias Prevention
Critical: All features use .shift(1) ensuring historical data only.
# WRONG - uses current bar close
df['ma_20'] = df['close'].rolling(20).mean() # Future data!
# CORRECT - uses previous bar close
df['ma_20'] = df['close'].shift(1).rolling(20).mean() # Historical only
This ensures:
- Feature at time T uses only data available at time T-1
- No look-ahead bias in model training
- True out-of-sample validation possible
- Realistic live trading performance
Model Training & Optimization
Training Data Preparation
def prepare_training_data(bars_df, test_split=0.25):
"""
Prepare features and labels for model training.
Args:
bars_df: Dollar bars with OHLCV
test_split: Train/test ratio
Returns:
X_train, X_test, y_train, y_test: Feature and label sets
"""
# Compute features (see above)
X, scaler, timestamps = compute_features(bars_df)
# Generate labels using triple barrier method
y = generate_labels(bars_df[timestamps], method='triple_barrier')
# Train/test split (temporal - no shuffling!)
split_idx = int(len(X) * (1 - test_split))
X_train = X[:split_idx]
X_test = X[split_idx:]
y_train = y[:split_idx]
y_test = y[split_idx:]
return X_train, X_test, y_train, y_test, scaler
Triple Barrier Labeling
Each sample gets a binary label (0/1) based on price movement:
def generate_labels(bars, upper_barrier=0.015, lower_barrier=-0.015,
max_bars=42):
"""
Triple barrier labeling: UP if hits upper barrier first,
DOWN if hits lower barrier first, or max_bars timeout.
Args:
bars: Dollar bars
upper_barrier: +1.5% profit target (1.5x ATR)
lower_barrier: -1.5% stop loss (1.5x ATR)
max_bars: Timeout after 42 bars
Returns:
labels: Binary array (1=up, 0=down/neutral)
"""
labels = []
for i in range(len(bars)):
entry_price = bars['close'].iloc[i]
# Scan forward up to max_bars
for j in range(i + 1, min(i + max_bars + 1, len(bars))):
high = bars['high'].iloc[j]
low = bars['low'].iloc[j]
# Check upper barrier (take profit)
if high >= entry_price * (1 + upper_barrier):
labels.append(1) # Win
break
# Check lower barrier (stop loss)
if low <= entry_price * (1 + lower_barrier):
labels.append(0) # Loss
break
else:
# Timeout: classify by close vs entry
if bars['close'].iloc[i + max_bars - 1] >= entry_price:
labels.append(1)
else:
labels.append(0)
return np.array(labels)
XGBoost Training with Optuna
from optuna import create_study, Trial
from xgboost import XGBClassifier
def objective(trial: Trial, X_train, X_test, y_train, y_test):
"""Optuna objective function."""
# Hyperparameter search space
params = {
'n_estimators': trial.suggest_int('n_estimators', 500, 3000, 100),
'max_depth': trial.suggest_int('max_depth', 4, 10),
'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3),
'subsample': trial.suggest_float('subsample', 0.6, 1.0),
'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0),
}
model = XGBClassifier(**params, random_state=42, n_jobs=-1)
model.fit(X_train, y_train, eval_set=[(X_test, y_test)],
early_stopping_rounds=50, verbose=False)
# Evaluate on test set
y_pred = model.predict(X_test)
y_pred_proba = model.predict_proba(X_test)[:, 1]
# Calculate Sharpe ratio (trading metric)
returns = (y_pred == y_test).astype(int) * 2 - 1 # +1 for wins, -1 for losses
sharpe = returns.mean() / (returns.std() + 1e-6)
return sharpe
def train_optimal_model(X_train, X_test, y_train, y_test, n_trials=1000):
"""Train model with optimal hyperparameters."""
study = create_study(direction='maximize')
study.optimize(
lambda trial: objective(trial, X_train, X_test, y_train, y_test),
n_trials=n_trials,
show_progress_bar=True
)
# Train final model with best parameters
best_params = study.best_params
best_model = XGBClassifier(**best_params, random_state=42, n_jobs=-1)
best_model.fit(X_train, y_train)
return best_model, study
Model Evaluation
from sklearn.metrics import confusion_matrix, f1_score, precision_score, recall_score
def evaluate_model(model, X_test, y_test):
"""Evaluate model performance metrics."""
y_pred = model.predict(X_test)
y_pred_proba = model.predict_proba(X_test)[:, 1]
cm = confusion_matrix(y_test, y_pred)
accuracy = (y_pred == y_test).mean()
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
# Trading metrics
wins = (y_pred == 1) & (y_test == 1)
losses = (y_pred == 1) & (y_test == 0)
win_rate = wins.sum() / (wins.sum() + losses.sum()) if (wins.sum() + losses.sum()) > 0 else 0
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-Score: {f1:.4f}")
print(f"Win Rate: {win_rate:.2%}")
print(f"Confusion Matrix:\n{cm}")
return {'accuracy': accuracy, 'precision': precision, 'recall': recall,
'f1': f1, 'win_rate': win_rate, 'confusion_matrix': cm}
Signal Generation Logic
Entry Conditions
Trial 244 Alpha Alpha requires minimum 2 of 3 signals:
def generate_signal(features_current, model, scaler, config):
"""
Generate trading signal based on model prediction and confirmations.
Args:
features_current: Current bar's feature vector
model: Trained XGBClassifier
scaler: StandardScaler for normalization
config: Trial 244 Alpha Alpha parameters
Returns:
signal: 1 (buy) or 0 (hold), confidence: 0.0-1.0
"""
# Scale features
features_scaled = scaler.transform(features_current.reshape(1, -1))
# Model prediction
model_signal = model.predict(features_scaled)[0]
confidence = model.predict_proba(features_scaled)[0][1]
# Signal 1: Momentum threshold (mean reversion)
momentum = features_current[0] # ret_1
momentum_signal = (momentum <= config['momentum_threshold']).astype(int)
# Signal 2: Volume confirmation
volume_ratio = features_current[5] / features_current[6] # current vol / vol_20
volume_signal = (volume_ratio >= config['volume_threshold']).astype(int)
# Signal 3: VWAP deviation (price discount)
vwap_dev = features_current[15] # vwap_deviation
vwap_signal = (vwap_dev <= config['vwap_dev_threshold']).astype(int)
# Combine signals: need 2+ to trigger
signal_sum = momentum_signal + volume_signal + vwap_signal
buy_signal = 1 if signal_sum >= config['min_signals'] else 0
# Confidence scaling
if buy_signal == 1:
# Higher confidence = more confirmations
confidence = min(1.0, confidence + (signal_sum - 2) * 0.1)
return buy_signal, confidence, {
'momentum': momentum_signal,
'volume': volume_signal,
'vwap': vwap_signal,
'total_signals': signal_sum
}
Position Sizing by Confidence
def calculate_position_size(confidence, base_size=0.01, config=None):
"""
Scale position size by model confidence.
Args:
confidence: Model prediction probability (0.0-1.0)
base_size: Base position (1% of capital)
config: Trial 244 Alpha Alpha config with sizing rules
Returns:
position_size: Actual position in percent of capital
"""
if confidence < 0.55:
return 0.0 # Below threshold, skip trade
elif confidence < 0.60:
return base_size * 0.25
elif confidence < 0.65:
return base_size * 0.50
elif confidence < 0.70:
return base_size * 0.75
else:
return base_size # Full position at 70%+ confidence
Exit Conditions (Triple Barrier)
def check_exit_condition(entry_price, current_price, bars_held,
atr_value, config):
"""
Check if position should be exited.
Args:
entry_price: Price at entry
current_price: Current market price
bars_held: Number of bars since entry
atr_value: Current ATR for dynamic stops
config: Trial 244 Alpha Alpha configuration
Returns:
exit_type: 'none', 'profit', 'loss', 'timeout'
"""
atr_stop = atr_value * config['atr_multiplier']
# Barrier 1: Take profit
if current_price >= entry_price + atr_stop:
return 'profit'
# Barrier 2: Stop loss
if current_price <= entry_price - atr_stop:
return 'loss'
# Barrier 3: Timeout
if bars_held >= config['holding_period']:
return 'timeout'
return 'none'
Risk Management Framework
6-Layer Risk Enforcement
class RiskManager:
"""Independent risk management enforcement."""
def __init__(self, initial_capital=100_000, config=None):
self.capital = initial_capital
self.peak_capital = initial_capital
self.daily_pnl = 0
self.weekly_pnl = 0
self.positions = []
self.config = config or self._default_config()
@staticmethod
def _default_config():
return {
'max_position_size': 0.10, # 10% of capital
'max_daily_loss': -0.05, # 5% daily loss limit
'max_weekly_loss': -0.10, # 10% weekly loss
'max_drawdown': -0.15, # 15% drawdown limit
'min_confidence': 0.55, # Model confidence floor
'max_volatility': 0.10, # 10% volatility threshold
'max_spread_bps': 50, # 50 bps max spread
}
def check_entry_allowed(self, signal, confidence, current_volatility,
bid_ask_spread_bps):
"""Layer 1: Pre-trade authorization."""
checks = {
'signal_present': signal == 1,
'confidence_ok': confidence >= self.config['min_confidence'],
'volatility_ok': current_volatility <= self.config['max_volatility'],
'spread_ok': bid_ask_spread_bps <= self.config['max_spread_bps'],
'daily_loss_ok': self.daily_pnl >= self.config['max_daily_loss'],
'position_limit_ok': len(self.positions) < 3, # Max 3 concurrent
}
allowed = all(checks.values())
return allowed, checks
def monitor_position(self, position):
"""Layer 2-6: Ongoing position monitoring."""
exit_type = 'none'
# Layer 2: Position limit
if position['exposure'] > self.capital * self.config['max_position_size']:
exit_type = 'position_limit'
# Layer 3: Stop loss hit
if position['current_price'] <= position['stop_loss']:
exit_type = 'stop_loss'
# Layer 4: Take profit hit
if position['current_price'] >= position['take_profit']:
exit_type = 'take_profit'
# Layer 5: Daily loss breach
if self.daily_pnl + position['unrealized_pnl'] <= self.config['max_daily_loss']:
exit_type = 'daily_loss_limit'
# Layer 6: Drawdown limit
if self.capital / self.peak_capital <= (1 + self.config['max_drawdown']):
exit_type = 'max_drawdown'
return exit_type
def update_pnl(self, position):
"""Update capital and P&L."""
pnl = position['pnl']
self.capital += pnl
self.daily_pnl += pnl
self.peak_capital = max(self.peak_capital, self.capital)
return self.capital
Data Processing Pipeline
Real-Time Feature Computation
class FeatureEngine:
"""Real-time feature computation with rolling windows."""
def __init__(self, window_size=50):
self.window = pd.DataFrame(columns=['open', 'high', 'low', 'close', 'volume'])
self.window_size = window_size
def add_bar(self, bar):
"""Add new dollar bar to rolling window."""
self.window = pd.concat([self.window, pd.DataFrame([bar])], ignore_index=True)
# Keep only last N bars
if len(self.window) > self.window_size:
self.window = self.window.iloc[-(self.window_size):]
return self.compute_features()
def compute_features(self):
"""Compute 17 features from current window."""
if len(self.window) < 50:
return None # Not enough data
features = {}
# Price features
close_lag = self.window['close'].shift(1)
features['ret_1'] = close_lag.pct_change().iloc[-1]
features['ret_3'] = (close_lag.iloc[-1] / close_lag.iloc[-4] - 1) if len(close_lag) >= 4 else 0
features['ret_5'] = (close_lag.iloc[-1] / close_lag.iloc[-6] - 1) if len(close_lag) >= 6 else 0
features['ret_accel'] = close_lag.diff().diff().iloc[-1]
high_20 = self.window['high'].iloc[-20:].max()
low_20 = self.window['low'].iloc[-20:].min()
features['close_pos'] = (self.window['close'].iloc[-1] - low_20) / (high_20 - low_20)
# Volume features
vol_20 = self.window['volume'].iloc[-20:].mean()
features['vol_20'] = vol_20
features['high_vol'] = 1 if self.window['volume'].iloc[-1] > vol_20 * 1.5 else 0
features['low_vol'] = 1 if self.window['volume'].iloc[-1] < vol_20 * 0.7 else 0
# [Additional feature computations...]
return np.array(list(features.values()))
Deployment Architecture
AWS Infrastructure
# ECS Task Definition
service: model-serving
cpu: 2048
memory: 4096
containers:
- image: quantflux/inference:latest
ports:
- 8000
environment:
MODEL_PATH: s3://quantflux-models/trial_244_xgb.pkl
SCALER_PATH: s3://quantflux-models/scaler.pkl
healthCheck:
command: ['CMD', 'curl', 'localhost:8000/health']
interval: 30
timeout: 5
retries: 3
# Auto-scaling
desiredCount: 3
scalingPolicy:
targetCPU: 70%
targetMemory: 80%
minTasks: 1
maxTasks: 10
# Load balancing
alb:
targetGroup: quantflux-inference
healthCheckPath: /health
healthCheckInterval: 30s
Inference Latency Targets
- Feature computation: <20ms (50-bar rolling window)
- Model inference: <30ms (XGBoost prediction)
- Risk checks: <10ms (6-layer enforcement)
- Order routing: <30ms (to exchange)
- Total end-to-end: <100ms (sub-100ms requirement)
References
- de Prado, M. L. (2018). "Advances in Financial Machine Learning"
- Aronson, D. (2007). "Evidence-Based Technical Analysis"
- Cryptofeed: Real-time crypto data library
- XGBoost Documentation: https://xgboost.readthedocs.io/
- VectorBT: Backtesting framework
Document Version: 1.0 Updated: 2025-11-19 Author: QuantFlux Research Team