QuantFlux-Trial244-BTC / TECHNICAL_ARCHITECTURE.md
Grogan-Dev's picture
FINAL: Proprietary (Zackariah Grogan), Alpha/Test model, no commercial use
aab3f24 verified

QuantFlux Alpha (Test Model for 3.0): Technical Architecture & Implementation Details

Table of Contents

  1. System Overview
  2. Dollar Bar Aggregation
  3. Feature Engineering Pipeline
  4. Model Training & Optimization
  5. Signal Generation Logic
  6. Risk Management Framework
  7. Data Processing Pipeline
  8. Deployment Architecture

System Overview

End-to-End Flow

Binance WebSocket (Real-time ticks)
        ↓
Tick Validation & Cleaning
        ↓
Dollar Bar Aggregator ($500k threshold)
        ↓
Feature Computation Engine
        ↓
XGBoost Model Inference
        ↓
Risk Management Checks (6 layers)
        ↓
Order Execution & P&L Tracking

Key Components

1. Data Ingestion Layer

  • Binance perpetual futures WebSocket (BTC/USDT)
  • Tick-level data: price, quantity, side, timestamp
  • Validation: Remove duplicates, out-of-order ticks, invalid quotes

2. Dollar Bar Generator

  • Aggregates ticks by dollar volume ($500,000 per bar)
  • Eliminates look-ahead bias (timestamps at bar completion)
  • Reduces autocorrelation by 10-20% vs time bars
  • Output: OHLCV candles with derived metrics (VWAP, ATR, RSI)

3. Feature Engineering

  • 17 core features from dollar bars
  • Computation: 50-bar rolling windows (no look-ahead)
  • Normalization: StandardScaler (mean=0, std=1)
  • Output: Feature vectors for model inference

4. Model Inference

  • XGBoost classifier (2,000 trees, depth=7)
  • Input: 17-dimensional feature vector
  • Output: Binary prediction (0/1) + confidence probability
  • Latency: <50ms single prediction (target <100ms batch)

5. Risk Management

  • 6 independent enforcement layers
  • Pre-trade checks (position limits, volatility filters)
  • In-trade monitoring (stop-loss, take-profit)
  • Post-trade validation (daily/weekly loss limits)

6. Execution Engine

  • Paper trading (simulated fills)
  • Order management (entry, stop-loss, take-profit)
  • P&L calculation (slippage-aware)
  • Logging and audit trails

Dollar Bar Aggregation

Motivation

Traditional time-based bars (1H, 4H) introduce biases:

  • Unequal information content (busy hours vs quiet hours)
  • Look-ahead bias (closing price depends on future ticks)
  • Autocorrelation (nearby bars correlated)

Dollar bars fix this: Sample by volume, not time.

Algorithm

def create_dollar_bar(ticks, dollar_threshold=500_000):
    """
    Aggregate ticks into bars by cumulative dollar volume.

    Args:
        ticks: List of (price, quantity, timestamp)
        dollar_threshold: Dollar volume per bar (e.g., $500k)

    Returns:
        bars: List of (open, high, low, close, volume, vwap, timestamp)
    """
    bars = []
    dollar_volume = 0
    bar_open = None
    bar_high = -inf
    bar_low = inf
    bar_volume = 0
    vwap_num = 0  # Numerator: sum(price * volume)
    vwap_den = 0  # Denominator: sum(volume)
    bar_start_time = None

    for price, quantity, timestamp in ticks:
        dollar_value = price * quantity

        # Initialize bar on first tick
        if bar_open is None:
            bar_open = price
            bar_start_time = timestamp

        # Update bar statistics
        bar_high = max(bar_high, price)
        bar_low = min(bar_low, price)
        bar_volume += quantity
        dollar_volume += dollar_value
        vwap_num += price * quantity
        vwap_den += quantity

        # Check if bar complete
        if dollar_volume >= dollar_threshold:
            vwap = vwap_num / vwap_den
            bar_close = price

            bars.append({
                'timestamp': timestamp,  # Last tick time (no look-ahead)
                'open': bar_open,
                'high': bar_high,
                'low': bar_low,
                'close': bar_close,
                'volume': bar_volume,
                'vwap': vwap,
                'duration_seconds': timestamp - bar_start_time
            })

            # Reset for next bar
            dollar_volume = 0
            bar_open = None
            bar_high = -inf
            bar_low = inf
            bar_volume = 0
            vwap_num = 0
            vwap_den = 0

    return bars

Advantages

  1. Look-Ahead Prevention: Timestamped at last tick, no future data used
  2. Information Normalization: Each bar represents ~equal market participation
  3. Autocorrelation Reduction: High-volume periods produce more bars, spreading correlation
  4. Microstructure Preservation: Captures real-time liquidity patterns

Threshold Selection

  • $500k per bar: Balances granularity vs bar frequency
    • At BTC $50k: ~10 contracts per bar
    • Produces ~50-100 bars/day in active periods
    • Covers market microstructure (0.01s - 10s windows)
  • Alternative thresholds:
    • $100k: Very granular, ~200-300 bars/day (high noise)
    • $1M: Coarse, ~20-30 bars/day (low signal resolution)

Feature Engineering Pipeline

Feature Categories

1. Price Action (5 features)

ret_1: Lag-1 Return

def ret_1(bars):
    """Single bar return (momentum)."""
    close = bars['close'].shift(1)  # No look-ahead
    return close.pct_change()
  • Captures immediate momentum
  • Used for mean-reversion identification
  • Importance: 4.93%

ret_3: 3-Bar Return

def ret_3(bars):
    """3-bar cumulative return."""
    close = bars['close'].shift(1)
    return (close / close.shift(2) - 1)
  • Medium-term trend confirmation
  • Smooths out single-bar noise
  • Importance: 4.95%

ret_5: 5-Bar Return

def ret_5(bars):
    """5-bar cumulative return."""
    close = bars['close'].shift(1)
    return (close / close.shift(4) - 1)
  • Regime identification (bull vs bear)
  • Filters out short-term noise
  • Importance: 4.96%

ret_accel: Return Acceleration

def ret_accel(bars):
    """Change in momentum (second derivative)."""
    close = bars['close'].shift(1)
    ret = close.pct_change()
    return ret.diff()
  • Detects momentum reversals
  • Peaks before trend changes
  • Importance: 4.99%

close_pos: Close Position

def close_pos(bars):
    """Position of close within 20-bar range."""
    close = bars['close'].shift(1)
    high_20 = bars['high'].shift(1).rolling(20).max()
    low_20 = bars['low'].shift(1).rolling(20).min()
    return (close - low_20) / (high_20 - low_20)
  • 0.0: At 20-bar low (oversold)
  • 1.0: At 20-bar high (overbought)
  • 0.5: Neutral (mean reversion opportunity)
  • Importance: 4.82%

2. Volume Features (3 features)

vol_20: 20-Bar Volume Mean

def vol_20(bars):
    """Expected volume baseline."""
    return bars['volume'].shift(1).rolling(20).mean()
  • Baseline for volume anomalies
  • Normalized by market regime
  • Importance: 5.08%

high_vol: Volume Spike

def high_vol(bars):
    """Binary: volume above 1.5x average."""
    volume = bars['volume'].shift(1)
    vol_mean = volume.rolling(20).mean()
    return (volume > vol_mean * 1.5).astype(int)
  • Confidence flag for trades
  • Indicates institutional activity
  • Importance: 4.74%

low_vol: Volume Drought

def low_vol(bars):
    """Binary: volume below 0.7x average."""
    volume = bars['volume'].shift(1)
    vol_mean = volume.rolling(20).mean()
    return (volume < vol_mean * 0.7).astype(int)
  • Risk flag (thin liquidity)
  • May precede price gaps
  • Importance: 4.80%

3. Volatility Features (2 features)

rsi_oversold: RSI < 30

def rsi_oversold(bars):
    """Relative Strength Index oversold condition."""
    close = bars['close'].shift(1)
    delta = close.diff()
    gain = (delta.where(delta > 0, 0)).rolling(14).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
    rs = gain / loss
    rsi = 100 - (100 / (1 + rs))
    return (rsi < 30).astype(int)
  • Oversold = bounce opportunity
  • 14-period standard RSI calculation
  • Importance: 5.07%

rsi_neutral: 30 <= RSI <= 70

def rsi_neutral(bars):
    """RSI in neutral zone (no extreme conditions)."""
    close = bars['close'].shift(1)
    # [RSI calculation as above]
    return ((rsi >= 30) & (rsi <= 70)).astype(int)
  • Normal volatility regime
  • Avoid extreme conditions
  • Importance: 5.14% (highest!)

4. MACD Features (1 feature)

macd_positive: MACD > 0

def macd_positive(bars):
    """Moving Average Convergence Divergence signal."""
    close = bars['close'].shift(1)
    ema12 = close.ewm(span=12, adjust=False).mean()
    ema26 = close.ewm(span=26, adjust=False).mean()
    macd = ema12 - ema26
    return (macd > 0).astype(int)
  • Bullish trend confirmation
  • Cross above zero = potential buy
  • Importance: 4.77%

5. Time-of-Day Features (4 features)

london_open: London Session Open

def london_open(bars):
    """Binary: 8:00 UTC ±30 minutes."""
    hour = bars['timestamp'].dt.hour
    minute = bars['timestamp'].dt.minute
    is_london = (hour == 8) & (minute >= 0) & (minute < 30)
    return is_london.astype(int)
  • Highest daily volatility period
  • Institutional orders flow
  • Importance: 5.08%

london_close: London Session Close

def london_close(bars):
    """Binary: 16:30 UTC ±30 minutes."""
    hour = bars['timestamp'].dt.hour
    minute = bars['timestamp'].dt.minute
    is_close = (hour == 16) & (minute >= 30) & (minute < 60)
    return is_close.astype(int)
  • Position unwinding
  • End-of-session volatility
  • Importance: 4.70%

nyse_open: NYSE Open

def nyse_open(bars):
    """Binary: 13:30 UTC ±30 minutes."""
    hour = bars['timestamp'].dt.hour
    minute = bars['timestamp'].dt.minute
    is_open = (hour == 13) & (minute >= 30) & (minute < 60)
    return is_open.astype(int)
  • Equities-crypto correlation spike
  • Derivative hedging flows
  • Importance: 5.02%

hour: Hour of Day

def hour(bars):
    """Hour of day (0-23 UTC)."""
    return bars['timestamp'].dt.hour
  • Captures intraday seasonality
  • 24-hour crypto cycles
  • Importance: 4.91%

6. VWAP Features (1 feature, embedded in signals)

vwap_deviation: Percent Deviation

def vwap_deviation(bars):
    """Percentage deviation from VWAP."""
    close = bars['close'].shift(1)
    vwap_20 = (bars['vwap'].shift(1).rolling(20).mean())
    return ((close - vwap_20) / vwap_20 * 100)
  • Price relative to fair value
  • Negative = oversold opportunity
  • Importance: Embedded in entry signals

7. ATR Features (1 feature, for stops)

atr_stops: Dynamic Stop Sizing

def atr_stops(bars, period=14):
    """Average True Range for stop-loss sizing."""
    high = bars['high'].shift(1)
    low = bars['low'].shift(1)
    close = bars['close'].shift(1)

    tr1 = high - low
    tr2 = (high - close.shift(1)).abs()
    tr3 = (low - close.shift(1)).abs()
    tr = max(tr1, tr2, tr3)

    atr = tr.rolling(period).mean()
    return atr * 1.0  # 1.0x multiplier
  • Dynamic risk/reward sizing
  • Scales with market volatility
  • Important: Controls position exposure

Feature Computation Code Example

import pandas as pd
from sklearn.preprocessing import StandardScaler

def compute_features(bars_df):
    """
    Compute all 17 features for dollar bars.

    Args:
        bars_df: DataFrame with OHLCV columns

    Returns:
        features_scaled: (N, 17) feature matrix, normalized
        scaler: Fitted StandardScaler for production use
    """
    df = bars_df.copy()

    # Price features
    df['ret_1'] = df['close'].shift(1).pct_change()
    df['ret_3'] = (df['close'].shift(1) / df['close'].shift(3) - 1)
    df['ret_5'] = (df['close'].shift(1) / df['close'].shift(5) - 1)
    df['ret_accel'] = df['ret_1'].diff()

    high_20 = df['high'].shift(1).rolling(20).max()
    low_20 = df['low'].shift(1).rolling(20).min()
    df['close_pos'] = (df['close'].shift(1) - low_20) / (high_20 - low_20)

    # Volume features
    df['vol_20'] = df['volume'].shift(1).rolling(20).mean()
    df['high_vol'] = (df['volume'].shift(1) > df['vol_20'] * 1.5).astype(int)
    df['low_vol'] = (df['volume'].shift(1) < df['vol_20'] * 0.7).astype(int)

    # Volatility features (RSI)
    close = df['close'].shift(1)
    delta = close.diff()
    gain = delta.where(delta > 0, 0).rolling(14).mean()
    loss = -delta.where(delta < 0, 0).rolling(14).mean()
    rs = gain / loss
    rsi = 100 - (100 / (1 + rs))
    df['rsi_oversold'] = (rsi < 30).astype(int)
    df['rsi_neutral'] = ((rsi >= 30) & (rsi <= 70)).astype(int)

    # MACD
    ema12 = close.ewm(span=12, adjust=False).mean()
    ema26 = close.ewm(span=26, adjust=False).mean()
    macd = ema12 - ema26
    df['macd_positive'] = (macd > 0).astype(int)

    # Time-of-day
    df['london_open'] = ((df.index.hour == 8) & (df.index.minute < 30)).astype(int)
    df['london_close'] = ((df.index.hour == 16) & (df.index.minute >= 30)).astype(int)
    df['nyse_open'] = ((df.index.hour == 13) & (df.index.minute >= 30)).astype(int)
    df['hour'] = df.index.hour

    # VWAP deviation (embedded in signals)
    df['vwap_deviation'] = ((df['close'].shift(1) - df['vwap']) / df['vwap'] * 100)

    # ATR
    high = df['high'].shift(1)
    low = df['low'].shift(1)
    close_lag = df['close'].shift(2)
    tr1 = high - low
    tr2 = (high - close_lag).abs()
    tr3 = (low - close_lag).abs()
    tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
    df['atr_stops'] = tr.rolling(14).mean() * 1.0

    # Select feature columns (order critical!)
    feature_cols = [
        'ret_1', 'ret_3', 'ret_5', 'ret_accel', 'close_pos',
        'vol_20', 'high_vol', 'low_vol',
        'rsi_oversold', 'rsi_neutral', 'macd_positive',
        'london_open', 'london_close', 'nyse_open', 'hour',
        'vwap_deviation', 'atr_stops'
    ]

    features = df[feature_cols].dropna()

    # Normalize
    scaler = StandardScaler()
    features_scaled = scaler.fit_transform(features)

    return features_scaled, scaler, features.index

Look-Ahead Bias Prevention

Critical: All features use .shift(1) ensuring historical data only.

# WRONG - uses current bar close
df['ma_20'] = df['close'].rolling(20).mean()  # Future data!

# CORRECT - uses previous bar close
df['ma_20'] = df['close'].shift(1).rolling(20).mean()  # Historical only

This ensures:

  1. Feature at time T uses only data available at time T-1
  2. No look-ahead bias in model training
  3. True out-of-sample validation possible
  4. Realistic live trading performance

Model Training & Optimization

Training Data Preparation

def prepare_training_data(bars_df, test_split=0.25):
    """
    Prepare features and labels for model training.

    Args:
        bars_df: Dollar bars with OHLCV
        test_split: Train/test ratio

    Returns:
        X_train, X_test, y_train, y_test: Feature and label sets
    """
    # Compute features (see above)
    X, scaler, timestamps = compute_features(bars_df)

    # Generate labels using triple barrier method
    y = generate_labels(bars_df[timestamps], method='triple_barrier')

    # Train/test split (temporal - no shuffling!)
    split_idx = int(len(X) * (1 - test_split))

    X_train = X[:split_idx]
    X_test = X[split_idx:]
    y_train = y[:split_idx]
    y_test = y[split_idx:]

    return X_train, X_test, y_train, y_test, scaler

Triple Barrier Labeling

Each sample gets a binary label (0/1) based on price movement:

def generate_labels(bars, upper_barrier=0.015, lower_barrier=-0.015,
                   max_bars=42):
    """
    Triple barrier labeling: UP if hits upper barrier first,
    DOWN if hits lower barrier first, or max_bars timeout.

    Args:
        bars: Dollar bars
        upper_barrier: +1.5% profit target (1.5x ATR)
        lower_barrier: -1.5% stop loss (1.5x ATR)
        max_bars: Timeout after 42 bars

    Returns:
        labels: Binary array (1=up, 0=down/neutral)
    """
    labels = []

    for i in range(len(bars)):
        entry_price = bars['close'].iloc[i]

        # Scan forward up to max_bars
        for j in range(i + 1, min(i + max_bars + 1, len(bars))):
            high = bars['high'].iloc[j]
            low = bars['low'].iloc[j]

            # Check upper barrier (take profit)
            if high >= entry_price * (1 + upper_barrier):
                labels.append(1)  # Win
                break

            # Check lower barrier (stop loss)
            if low <= entry_price * (1 + lower_barrier):
                labels.append(0)  # Loss
                break
        else:
            # Timeout: classify by close vs entry
            if bars['close'].iloc[i + max_bars - 1] >= entry_price:
                labels.append(1)
            else:
                labels.append(0)

    return np.array(labels)

XGBoost Training with Optuna

from optuna import create_study, Trial
from xgboost import XGBClassifier

def objective(trial: Trial, X_train, X_test, y_train, y_test):
    """Optuna objective function."""

    # Hyperparameter search space
    params = {
        'n_estimators': trial.suggest_int('n_estimators', 500, 3000, 100),
        'max_depth': trial.suggest_int('max_depth', 4, 10),
        'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3),
        'subsample': trial.suggest_float('subsample', 0.6, 1.0),
        'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0),
    }

    model = XGBClassifier(**params, random_state=42, n_jobs=-1)
    model.fit(X_train, y_train, eval_set=[(X_test, y_test)],
              early_stopping_rounds=50, verbose=False)

    # Evaluate on test set
    y_pred = model.predict(X_test)
    y_pred_proba = model.predict_proba(X_test)[:, 1]

    # Calculate Sharpe ratio (trading metric)
    returns = (y_pred == y_test).astype(int) * 2 - 1  # +1 for wins, -1 for losses
    sharpe = returns.mean() / (returns.std() + 1e-6)

    return sharpe

def train_optimal_model(X_train, X_test, y_train, y_test, n_trials=1000):
    """Train model with optimal hyperparameters."""

    study = create_study(direction='maximize')
    study.optimize(
        lambda trial: objective(trial, X_train, X_test, y_train, y_test),
        n_trials=n_trials,
        show_progress_bar=True
    )

    # Train final model with best parameters
    best_params = study.best_params
    best_model = XGBClassifier(**best_params, random_state=42, n_jobs=-1)
    best_model.fit(X_train, y_train)

    return best_model, study

Model Evaluation

from sklearn.metrics import confusion_matrix, f1_score, precision_score, recall_score

def evaluate_model(model, X_test, y_test):
    """Evaluate model performance metrics."""

    y_pred = model.predict(X_test)
    y_pred_proba = model.predict_proba(X_test)[:, 1]

    cm = confusion_matrix(y_test, y_pred)
    accuracy = (y_pred == y_test).mean()
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)

    # Trading metrics
    wins = (y_pred == 1) & (y_test == 1)
    losses = (y_pred == 1) & (y_test == 0)
    win_rate = wins.sum() / (wins.sum() + losses.sum()) if (wins.sum() + losses.sum()) > 0 else 0

    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1-Score: {f1:.4f}")
    print(f"Win Rate: {win_rate:.2%}")
    print(f"Confusion Matrix:\n{cm}")

    return {'accuracy': accuracy, 'precision': precision, 'recall': recall,
            'f1': f1, 'win_rate': win_rate, 'confusion_matrix': cm}

Signal Generation Logic

Entry Conditions

Trial 244 Alpha Alpha requires minimum 2 of 3 signals:

def generate_signal(features_current, model, scaler, config):
    """
    Generate trading signal based on model prediction and confirmations.

    Args:
        features_current: Current bar's feature vector
        model: Trained XGBClassifier
        scaler: StandardScaler for normalization
        config: Trial 244 Alpha Alpha parameters

    Returns:
        signal: 1 (buy) or 0 (hold), confidence: 0.0-1.0
    """

    # Scale features
    features_scaled = scaler.transform(features_current.reshape(1, -1))

    # Model prediction
    model_signal = model.predict(features_scaled)[0]
    confidence = model.predict_proba(features_scaled)[0][1]

    # Signal 1: Momentum threshold (mean reversion)
    momentum = features_current[0]  # ret_1
    momentum_signal = (momentum <= config['momentum_threshold']).astype(int)

    # Signal 2: Volume confirmation
    volume_ratio = features_current[5] / features_current[6]  # current vol / vol_20
    volume_signal = (volume_ratio >= config['volume_threshold']).astype(int)

    # Signal 3: VWAP deviation (price discount)
    vwap_dev = features_current[15]  # vwap_deviation
    vwap_signal = (vwap_dev <= config['vwap_dev_threshold']).astype(int)

    # Combine signals: need 2+ to trigger
    signal_sum = momentum_signal + volume_signal + vwap_signal
    buy_signal = 1 if signal_sum >= config['min_signals'] else 0

    # Confidence scaling
    if buy_signal == 1:
        # Higher confidence = more confirmations
        confidence = min(1.0, confidence + (signal_sum - 2) * 0.1)

    return buy_signal, confidence, {
        'momentum': momentum_signal,
        'volume': volume_signal,
        'vwap': vwap_signal,
        'total_signals': signal_sum
    }

Position Sizing by Confidence

def calculate_position_size(confidence, base_size=0.01, config=None):
    """
    Scale position size by model confidence.

    Args:
        confidence: Model prediction probability (0.0-1.0)
        base_size: Base position (1% of capital)
        config: Trial 244 Alpha Alpha config with sizing rules

    Returns:
        position_size: Actual position in percent of capital
    """

    if confidence < 0.55:
        return 0.0  # Below threshold, skip trade
    elif confidence < 0.60:
        return base_size * 0.25
    elif confidence < 0.65:
        return base_size * 0.50
    elif confidence < 0.70:
        return base_size * 0.75
    else:
        return base_size  # Full position at 70%+ confidence

Exit Conditions (Triple Barrier)

def check_exit_condition(entry_price, current_price, bars_held,
                        atr_value, config):
    """
    Check if position should be exited.

    Args:
        entry_price: Price at entry
        current_price: Current market price
        bars_held: Number of bars since entry
        atr_value: Current ATR for dynamic stops
        config: Trial 244 Alpha Alpha configuration

    Returns:
        exit_type: 'none', 'profit', 'loss', 'timeout'
    """

    atr_stop = atr_value * config['atr_multiplier']

    # Barrier 1: Take profit
    if current_price >= entry_price + atr_stop:
        return 'profit'

    # Barrier 2: Stop loss
    if current_price <= entry_price - atr_stop:
        return 'loss'

    # Barrier 3: Timeout
    if bars_held >= config['holding_period']:
        return 'timeout'

    return 'none'

Risk Management Framework

6-Layer Risk Enforcement

class RiskManager:
    """Independent risk management enforcement."""

    def __init__(self, initial_capital=100_000, config=None):
        self.capital = initial_capital
        self.peak_capital = initial_capital
        self.daily_pnl = 0
        self.weekly_pnl = 0
        self.positions = []
        self.config = config or self._default_config()

    @staticmethod
    def _default_config():
        return {
            'max_position_size': 0.10,      # 10% of capital
            'max_daily_loss': -0.05,        # 5% daily loss limit
            'max_weekly_loss': -0.10,       # 10% weekly loss
            'max_drawdown': -0.15,          # 15% drawdown limit
            'min_confidence': 0.55,         # Model confidence floor
            'max_volatility': 0.10,         # 10% volatility threshold
            'max_spread_bps': 50,           # 50 bps max spread
        }

    def check_entry_allowed(self, signal, confidence, current_volatility,
                          bid_ask_spread_bps):
        """Layer 1: Pre-trade authorization."""

        checks = {
            'signal_present': signal == 1,
            'confidence_ok': confidence >= self.config['min_confidence'],
            'volatility_ok': current_volatility <= self.config['max_volatility'],
            'spread_ok': bid_ask_spread_bps <= self.config['max_spread_bps'],
            'daily_loss_ok': self.daily_pnl >= self.config['max_daily_loss'],
            'position_limit_ok': len(self.positions) < 3,  # Max 3 concurrent
        }

        allowed = all(checks.values())
        return allowed, checks

    def monitor_position(self, position):
        """Layer 2-6: Ongoing position monitoring."""

        exit_type = 'none'

        # Layer 2: Position limit
        if position['exposure'] > self.capital * self.config['max_position_size']:
            exit_type = 'position_limit'

        # Layer 3: Stop loss hit
        if position['current_price'] <= position['stop_loss']:
            exit_type = 'stop_loss'

        # Layer 4: Take profit hit
        if position['current_price'] >= position['take_profit']:
            exit_type = 'take_profit'

        # Layer 5: Daily loss breach
        if self.daily_pnl + position['unrealized_pnl'] <= self.config['max_daily_loss']:
            exit_type = 'daily_loss_limit'

        # Layer 6: Drawdown limit
        if self.capital / self.peak_capital <= (1 + self.config['max_drawdown']):
            exit_type = 'max_drawdown'

        return exit_type

    def update_pnl(self, position):
        """Update capital and P&L."""
        pnl = position['pnl']
        self.capital += pnl
        self.daily_pnl += pnl
        self.peak_capital = max(self.peak_capital, self.capital)
        return self.capital

Data Processing Pipeline

Real-Time Feature Computation

class FeatureEngine:
    """Real-time feature computation with rolling windows."""

    def __init__(self, window_size=50):
        self.window = pd.DataFrame(columns=['open', 'high', 'low', 'close', 'volume'])
        self.window_size = window_size

    def add_bar(self, bar):
        """Add new dollar bar to rolling window."""
        self.window = pd.concat([self.window, pd.DataFrame([bar])], ignore_index=True)

        # Keep only last N bars
        if len(self.window) > self.window_size:
            self.window = self.window.iloc[-(self.window_size):]

        return self.compute_features()

    def compute_features(self):
        """Compute 17 features from current window."""

        if len(self.window) < 50:
            return None  # Not enough data

        features = {}

        # Price features
        close_lag = self.window['close'].shift(1)
        features['ret_1'] = close_lag.pct_change().iloc[-1]
        features['ret_3'] = (close_lag.iloc[-1] / close_lag.iloc[-4] - 1) if len(close_lag) >= 4 else 0
        features['ret_5'] = (close_lag.iloc[-1] / close_lag.iloc[-6] - 1) if len(close_lag) >= 6 else 0
        features['ret_accel'] = close_lag.diff().diff().iloc[-1]

        high_20 = self.window['high'].iloc[-20:].max()
        low_20 = self.window['low'].iloc[-20:].min()
        features['close_pos'] = (self.window['close'].iloc[-1] - low_20) / (high_20 - low_20)

        # Volume features
        vol_20 = self.window['volume'].iloc[-20:].mean()
        features['vol_20'] = vol_20
        features['high_vol'] = 1 if self.window['volume'].iloc[-1] > vol_20 * 1.5 else 0
        features['low_vol'] = 1 if self.window['volume'].iloc[-1] < vol_20 * 0.7 else 0

        # [Additional feature computations...]

        return np.array(list(features.values()))

Deployment Architecture

AWS Infrastructure

# ECS Task Definition
service: model-serving
cpu: 2048
memory: 4096
containers:
  - image: quantflux/inference:latest
    ports:
      - 8000
    environment:
      MODEL_PATH: s3://quantflux-models/trial_244_xgb.pkl
      SCALER_PATH: s3://quantflux-models/scaler.pkl
    healthCheck:
      command: ['CMD', 'curl', 'localhost:8000/health']
      interval: 30
      timeout: 5
      retries: 3

# Auto-scaling
desiredCount: 3
scalingPolicy:
  targetCPU: 70%
  targetMemory: 80%
  minTasks: 1
  maxTasks: 10

# Load balancing
alb:
  targetGroup: quantflux-inference
  healthCheckPath: /health
  healthCheckInterval: 30s

Inference Latency Targets

  • Feature computation: <20ms (50-bar rolling window)
  • Model inference: <30ms (XGBoost prediction)
  • Risk checks: <10ms (6-layer enforcement)
  • Order routing: <30ms (to exchange)
  • Total end-to-end: <100ms (sub-100ms requirement)

References

  1. de Prado, M. L. (2018). "Advances in Financial Machine Learning"
  2. Aronson, D. (2007). "Evidence-Based Technical Analysis"
  3. Cryptofeed: Real-time crypto data library
  4. XGBoost Documentation: https://xgboost.readthedocs.io/
  5. VectorBT: Backtesting framework

Document Version: 1.0 Updated: 2025-11-19 Author: QuantFlux Research Team