# ============================================================================== # 🧠 ml_engine/pattern_engine.py (Refactored TitanEngine) # ============================================================================== # GEM-Architect Approved # - Restored full feature engineering logic from original Titan. # - Renamed Class: TitanEngine -> PatternEngine. # - Renamed Model: TitanResNet -> PatternResNet. # ============================================================================== import os import joblib import numpy as np import pandas as pd import torch import torch.nn as nn import traceback import warnings warnings.filterwarnings('ignore') # ------------------------------------------------------------------------------ # 1. Model Architecture (Must match training EXACTLY) # ------------------------------------------------------------------------------ class ResidualBlock(nn.Module): def __init__(self, channels, kernel_size=3, dropout=0.2): super().__init__() self.conv1 = nn.Conv1d(channels, channels, kernel_size, padding=kernel_size//2) self.bn1 = nn.BatchNorm1d(channels) self.relu = nn.ReLU() self.conv2 = nn.Conv1d(channels, channels, kernel_size, padding=kernel_size//2) self.bn2 = nn.BatchNorm1d(channels) self.dropout = nn.Dropout(dropout) def forward(self, x): residual = x out = self.conv1(x) out = self.bn1(out) out = self.relu(out) out = self.dropout(out) out = self.conv2(out) out = self.bn2(out) out += residual out = self.relu(out) return out class PatternResNet(nn.Module): def __init__(self, in_ch): super().__init__() self.entry = nn.Sequential( nn.Conv1d(in_ch, 64, kernel_size=1), nn.BatchNorm1d(64), nn.ReLU() ) self.layer1 = nn.Sequential( ResidualBlock(64), nn.MaxPool1d(2) ) self.layer2 = nn.Sequential( nn.Conv1d(64, 128, 3, padding=1), nn.BatchNorm1d(128), nn.ReLU(), ResidualBlock(128), nn.MaxPool1d(2) ) self.layer3 = nn.Sequential( nn.Conv1d(128, 256, 3, padding=1), nn.BatchNorm1d(256), nn.ReLU(), ResidualBlock(256), nn.AdaptiveAvgPool1d(1) ) self.head = nn.Sequential( nn.Flatten(), nn.Linear(256, 128), nn.ReLU(), nn.Dropout(0.4), nn.Linear(128, 3) ) def forward(self, x): x = self.entry(x) x = self.layer1(x) x = self.layer2(x) x = self.layer3(x) return self.head(x) # ------------------------------------------------------------------------------ # 2. Production Engine Class # ------------------------------------------------------------------------------ class PatternEngine: def __init__(self, model_dir="ml_models/Unified_Models_V1"): # Expecting the same model file names self.model_path = os.path.join(model_dir, "cnn_best.pt") self.scaler_path = os.path.join(model_dir, "seq_scaler.pkl") self.model = None self.scaler = None self.device = torch.device("cpu") # Inference on CPU is safer for stability self.initialized = False # Exact Features used in Training (Critical to keep) self.features_list = [ "log_ret","vol_spike","taker_buy_ratio","proxy_spread","amihud","avg_ticket_usd", "upper_wick_ratio","lower_wick_ratio","body_to_range","atr_pct_signal" ] self.WINDOW_SIZE = 64 async def initialize(self): """Load Model and Scaler""" if self.initialized: return True print(f"🧠 [PatternEngine] Initializing PyTorch Engine from {self.model_path}...") try: if not os.path.exists(self.model_path) or not os.path.exists(self.scaler_path): print(f"❌ [PatternEngine] Artifacts missing in {self.model_path}") return False # 1. Load Scaler self.scaler = joblib.load(self.scaler_path) # 2. Load Model self.model = PatternResNet(in_ch=len(self.features_list)).to(self.device) # Safe loading for CPU checkpoint = torch.load(self.model_path, map_location=self.device) if isinstance(checkpoint, dict) and 'model' in checkpoint: self.model.load_state_dict(checkpoint['model']) else: self.model.load_state_dict(checkpoint) self.model.eval() # Set to evaluation mode self.initialized = True print(f"✅ [PatternEngine] Online. ResNet-1D Loaded successfully.") return True except Exception as e: print(f"❌ [PatternEngine] Init Error: {e}") traceback.print_exc() return False # --- Feature Engineering Helpers (Restored Fully) --- def _wilder_rma(self, x, n): x = np.asarray(x, dtype=float) return pd.Series(x).ewm(alpha=1.0/n, adjust=False).mean().values def _rolling_mean(self, x, w): return pd.Series(x).rolling(w, min_periods=1).mean().values def _rolling_median(self, x, w): return pd.Series(x).rolling(w, min_periods=1).median().values def preprocess_live_data(self, df): """ Turns raw OHLCV DataFrame into the exact Feature Matrix used for training. """ try: df = df.copy() # Ensure sorting if 'timestamp' in df.columns: df = df.sort_values('timestamp') # Basic conversions close = df['close'].values.astype(float) high = df['high'].values.astype(float) low = df['low'].values.astype(float) open_ = df['open'].values.astype(float) # Use quote volume if available if 'quote_volume' in df.columns: vol_usd = df['quote_volume'].values.astype(float) else: vol_usd = (close * df['volume'].values).astype(float) vol_usd = np.maximum(vol_usd, 1.0) # 1. ATR (14) prev_close = np.roll(close, 1); prev_close[0] = close[0] tr = np.maximum(high - low, np.maximum(np.abs(high - prev_close), np.abs(low - prev_close))) atr = self._wilder_rma(tr, 14) atr_safe = np.maximum(atr, 1e-9) # 2. Features Calculation df['log_ret'] = np.log(close / np.maximum(prev_close, 1e-9)) vol_ma = self._rolling_mean(vol_usd, 20) df['vol_spike'] = vol_usd / np.maximum(vol_ma, 1e-9) # Taker buy ratio if 'taker_buy_base_asset_volume' in df.columns: taker_vol = df['taker_buy_base_asset_volume'].values * close df['taker_buy_ratio'] = taker_vol / vol_usd else: df['taker_buy_ratio'] = 0.5 raw_spread = (high - low) / np.maximum(close, 1e-9) df['proxy_spread'] = self._rolling_median(raw_spread, 14) * 0.5 df['amihud'] = np.abs(df['log_ret']) / vol_usd # Num trades proxy if 'num_trades' in df.columns: num_trades = df['num_trades'].values else: num_trades = vol_usd / 1000.0 df['avg_ticket_usd'] = vol_usd / np.maximum(num_trades, 1.0) rng = np.maximum(high - low, 1e-9) df['upper_wick_ratio'] = (high - np.maximum(open_, close)) / rng df['lower_wick_ratio'] = (np.minimum(open_, close) - low) / rng df['body_to_range'] = np.abs(close - open_) / rng df['atr_pct_signal'] = atr_safe / close # Filter NaNs df = df.replace([np.inf, -np.inf], np.nan).fillna(0) # Extract only the needed window if len(df) < self.WINDOW_SIZE: return None feature_matrix = df[self.features_list].iloc[-self.WINDOW_SIZE:].values.astype(np.float32) return feature_matrix except Exception as e: print(f"❌ [PatternEngine] Preprocessing Error: {e}") return None def predict(self, ohlcv_data: dict) -> dict: """ Main Interface used by Processor. """ if not self.initialized: return {'score': 0.0, 'error': 'Not Initialized'} try: target_tf = '15m' raw_data = ohlcv_data.get(target_tf) if raw_data is None: return {'score': 0.0, 'error': 'No 15m Data'} if isinstance(raw_data, list): df = pd.DataFrame(raw_data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) elif isinstance(raw_data, pd.DataFrame): df = raw_data else: return {'score': 0.0, 'error': f'Invalid Data Type: {type(raw_data)}'} if df.empty: return {'score': 0.0, 'error': 'Empty Data'} # Preprocess X_raw = self.preprocess_live_data(df) if X_raw is None: return {'score': 0.0, 'error': 'Not enough data for window'} # Scale X_scaled = self.scaler.transform(X_raw) # Prepare Tensor X_tensor = torch.tensor(X_scaled.T).unsqueeze(0).to(self.device) # Inference with torch.no_grad(): logits = self.model(X_tensor) probs = torch.softmax(logits, dim=1).cpu().numpy()[0] return { 'score': float(probs[2]), # Win Probability 'probs': probs.tolist(), # [Neutral, Loss, Win] 'status': 'OK' } except Exception as e: print(f"❌ [PatternEngine] Inference Error: {e}") traceback.print_exc() return {'score': 0.0, 'error': str(e)}