#!/usr/bin/env python3 """ Comprehensive Test Suite for Trading Intelligence System ========================================================= Tests each module independently, then runs full integration. Every assertion is checked, every output is validated. """ import sys, os, time, traceback, warnings warnings.filterwarnings('ignore') sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', buffering=1) sys.path.insert(0, '/app') import numpy as np import pandas as pd import torch PASS = 0 FAIL = 0 def test(name, condition, detail=""): global PASS, FAIL if condition: PASS += 1 print(f" ✅ {name}") else: FAIL += 1 print(f" ❌ {name} — {detail}") def section(title): print(f"\n{'='*70}") print(f" {title}") print(f"{'='*70}") # ═══════════════════════════════════════════════════════ # GENERATE SYNTHETIC DATA (shared across all tests) # ═══════════════════════════════════════════════════════ section("DATA GENERATION") np.random.seed(42) num_days = 1500 dt = 1/252 prices = [150.0] vol = 0.20 for i in range(num_days - 1): vol = vol + 0.1 * (0.20 - vol) * dt + 0.3 * np.sqrt(dt) * np.random.normal() vol = max(vol, 0.05) ret = (0.08 - 0.5 * vol**2) * dt + vol * np.sqrt(dt) * np.random.normal() prices.append(prices[-1] * np.exp(ret)) df = pd.DataFrame({ 'date': pd.date_range('2019-01-02', periods=num_days, freq='B')[:num_days], 'open': [p * (1 + np.random.normal(0, 0.002)) for p in prices], 'high': [p * (1 + abs(np.random.normal(0, 0.01))) for p in prices], 'low': [p * (1 - abs(np.random.normal(0, 0.01))) for p in prices], 'close': prices, 'volume': [int(1e6 * np.exp(np.random.normal(0, 0.3))) for _ in range(num_days)], }) df['high'] = df[['open', 'high', 'close']].max(axis=1) * (1 + abs(np.random.normal(0, 0.002, num_days))) df['low'] = df[['open', 'low', 'close']].min(axis=1) * (1 - abs(np.random.normal(0, 0.002, num_days))) test("Data generated", len(df) == num_days, f"got {len(df)}") test("OHLCV columns present", all(c in df.columns for c in ['open','high','low','close','volume'])) test("No NaN in raw data", df[['open','high','low','close','volume']].isna().sum().sum() == 0) test("High >= Low", (df['high'] >= df['low']).all()) test("High >= Close", (df['high'] >= df['close']).all()) test("Low <= Close", (df['low'] <= df['close']).all()) print(f" Price range: ${min(prices):.2f} — ${max(prices):.2f}") # ═══════════════════════════════════════════════════════ # TEST 1: FEATURE ENGINE # ═══════════════════════════════════════════════════════ section("TEST 1: FEATURE ENGINE") try: from trading_intelligence.feature_engine import FeatureEngine, SentimentFeatureEngine fe = FeatureEngine(lookback_window=30, prediction_horizons=[1, 5, 20]) features = fe.compute_all_features(df) test("Feature engine initializes", True) test("Features computed", len(features) > 0, f"got {len(features)} rows") test("No NaN after dropna", features[fe.feature_names].isna().sum().sum() == 0) test("Feature count >= 60", len(fe.feature_names) >= 60, f"got {len(fe.feature_names)}") # Check specific feature groups price_feats = [f for f in fe.feature_names if 'return' in f or 'momentum' in f or 'body' in f] tech_feats = [f for f in fe.feature_names if 'rsi' in f or 'macd' in f or 'ema' in f or 'bb_' in f] vol_feats = [f for f in fe.feature_names if 'vol' in f.lower() and 'obv' not in f] regime_feats = [f for f in fe.feature_names if 'regime' in f or 'trend' in f or 'hurst' in f] test("Price features exist", len(price_feats) > 5, f"got {len(price_feats)}") test("Technical indicators exist", len(tech_feats) > 10, f"got {len(tech_feats)}") test("Volatility features exist", len(vol_feats) > 5, f"got {len(vol_feats)}") test("Regime features exist", len(regime_feats) > 2, f"got {len(regime_feats)}") # Check targets for h in [1, 5, 20]: test(f"Target direction_{h} exists", f'target_direction_{h}' in features.columns) test(f"Target return_{h} exists", f'target_return_{h}' in features.columns) vals = features[f'target_direction_{h}'].dropna().unique() test(f"Direction_{h} is binary", set(vals).issubset({0.0, 1.0}), f"got {vals[:5]}") # Normalization features_norm, norm_params = fe.normalize_features(features) test("Normalization produces params", len(norm_params) > 0) test("Normalized features have similar scale", abs(features_norm[fe.feature_names].mean().mean()) < 1.0, f"mean={features_norm[fe.feature_names].mean().mean():.4f}") # Sequence creation target_cols = [] for h in [1, 5, 20]: target_cols.extend([f'target_direction_{h}', f'target_return_{h}']) X, y = fe.create_sequences(features_norm, target_cols=target_cols) valid = np.isfinite(X).all(axis=(1, 2)) & np.isfinite(y).all(axis=1) X, y = X[valid], y[valid] test("Sequences created", X.shape[0] > 0) test("X shape correct (N, C, L)", len(X.shape) == 3) test("X channels match features", X.shape[1] == len(fe.feature_names), f"{X.shape[1]} vs {len(fe.feature_names)}") test("X lookback correct", X.shape[2] == 30, f"got {X.shape[2]}") test("Y targets = 6 (3 horizons × 2)", y.shape[1] == 6, f"got {y.shape[1]}") test("No NaN/Inf in X", np.isfinite(X).all()) test("No NaN/Inf in y", np.isfinite(y).all()) # Sentiment engine se = SentimentFeatureEngine() score = se.compute_rule_based_sentiment("Stock upgraded, strong growth expected, bullish outlook") test("Sentiment positive for bullish text", score > 0, f"score={score:.3f}") score_neg = se.compute_rule_based_sentiment("Stock crashed, massive loss, bearish outlook") test("Sentiment negative for bearish text", score_neg < 0, f"score={score_neg:.3f}") print(f"\n 📊 Feature Summary: {len(fe.feature_names)} features, {X.shape[0]} samples") print(f" Feature names: {fe.feature_names[:10]}...") except Exception as e: test("Feature engine module", False, f"EXCEPTION: {e}") traceback.print_exc() # ═══════════════════════════════════════════════════════ # TEST 2: PREDICTION MODEL # ═══════════════════════════════════════════════════════ section("TEST 2: PREDICTION MODEL") try: from trading_intelligence.prediction_model import ( PatchEmbedding, PositionalEncoding, MultiHeadAttention, TransformerBlock, ChannelMixer, PredictionHead, TradingTransformer, MultiTaskLoss ) device = torch.device('cpu') num_channels = X.shape[1] batch_size = 16 # Test PatchEmbedding pe = PatchEmbedding(patch_len=6, stride=3, d_model=64) test_input = torch.randn(batch_size, num_channels, 30) patches = pe(test_input) test("PatchEmbedding forward", patches.shape[0] == batch_size) test("PatchEmbedding output 4D", len(patches.shape) == 4) test("PatchEmbedding d_model=64", patches.shape[-1] == 64) # Test PositionalEncoding pos = PositionalEncoding(d_model=64) pos_out = pos(patches) test("PositionalEncoding same shape", pos_out.shape == patches.shape) # Test MultiHeadAttention mha = MultiHeadAttention(d_model=64, n_heads=4) attn_input = torch.randn(batch_size, 10, 64) attn_out = mha(attn_input) test("MHA forward", attn_out.shape == attn_input.shape) # Test TransformerBlock tb = TransformerBlock(d_model=64, n_heads=4, d_ff=128) tb_out = tb(attn_input) test("TransformerBlock forward", tb_out.shape == attn_input.shape) # Test ChannelMixer cm = ChannelMixer(num_channels=num_channels, d_model=64, n_heads=4) cm_out = cm(patches) test("ChannelMixer forward", cm_out.shape == patches.shape) # Test PredictionHead ph = PredictionHead(d_model=64, num_horizons=3) ph_input = torch.randn(batch_size, 64) ph_out = ph(ph_input) test("PredictionHead returns dict", isinstance(ph_out, dict)) test("PredictionHead has direction_logits", 'direction_logits' in ph_out) test("PredictionHead has expected_return", 'expected_return' in ph_out) test("PredictionHead has log_variance", 'log_variance' in ph_out) test("Direction shape (B, 3)", ph_out['direction_logits'].shape == (batch_size, 3)) # Test Full TradingTransformer model = TradingTransformer( num_channels=num_channels, seq_len=30, patch_len=6, stride=3, d_model=64, n_heads=4, n_layers=2, d_ff=128, num_horizons=3, dropout=0.1, ).to(device) param_count = sum(p.numel() for p in model.parameters()) test("Model instantiates", True) test("Model has parameters", param_count > 0, f"{param_count:,} params") x_batch = torch.FloatTensor(X[:batch_size]).to(device) output = model(x_batch) test("Model forward pass", isinstance(output, dict)) test("Output direction_logits shape", output['direction_logits'].shape == (batch_size, 3)) test("Output expected_return shape", output['expected_return'].shape == (batch_size, 3)) test("Output log_variance shape", output['log_variance'].shape == (batch_size, 3)) test("No NaN in output", all(torch.isfinite(v).all().item() for v in output.values())) # Test predict_with_confidence preds = model.predict_with_confidence(x_batch) test("predict_with_confidence returns dict", isinstance(preds, dict)) test("direction_probs in [0,1]", (preds['direction_probs'] >= 0).all() and (preds['direction_probs'] <= 1).all()) test("confidence in [0,1]", (preds['confidence'] >= 0).all() and (preds['confidence'] <= 1).all()) # Test MultiTaskLoss loss_fn = MultiTaskLoss(num_horizons=3).to(device) y_batch = torch.FloatTensor(y[:batch_size]).to(device) directions = torch.stack([y_batch[:, i*2] for i in range(3)], dim=1) returns = torch.stack([y_batch[:, i*2+1] for i in range(3)], dim=1) targets = {'direction': directions, 'returns': returns} losses = loss_fn(output, targets) test("Loss computes", isinstance(losses, dict)) test("total_loss is scalar", losses['total_loss'].dim() == 0) test("total_loss is finite", torch.isfinite(losses['total_loss']).item()) test("direction_loss exists", 'direction_loss' in losses) test("return_loss exists", 'return_loss' in losses) test("risk_loss exists", 'risk_loss' in losses) # Test backward pass losses['total_loss'].backward() grads_ok = all(p.grad is not None and torch.isfinite(p.grad).all() for p in model.parameters() if p.requires_grad) test("Backward pass - gradients computed", grads_ok) print(f"\n 🧠 Model: {param_count:,} params, output verified across all heads") except Exception as e: test("Prediction model module", False, f"EXCEPTION: {e}") traceback.print_exc() # ═══════════════════════════════════════════════════════ # TEST 3: TRAINING LOOP (abbreviated) # ═══════════════════════════════════════════════════════ section("TEST 3: TRAINING LOOP") try: from torch.utils.data import TensorDataset, DataLoader # Split data n = len(X) train_end = int(n * 0.7) val_end = int(n * 0.85) X_train, y_train = X[:train_end], y[:train_end] X_val, y_val = X[train_end:val_end], y[train_end:val_end] X_test, y_test = X[val_end:], y[val_end:] test("Train set size > 0", len(X_train) > 0, f"{len(X_train)}") test("Val set size > 0", len(X_val) > 0, f"{len(X_val)}") test("Test set size > 0", len(X_test) > 0, f"{len(X_test)}") # Re-init model fresh model = TradingTransformer( num_channels=num_channels, seq_len=30, patch_len=6, stride=3, d_model=64, n_heads=4, n_layers=2, d_ff=128, num_horizons=3, dropout=0.1, ).to(device) loss_fn = MultiTaskLoss(num_horizons=3).to(device) optimizer = torch.optim.AdamW( list(model.parameters()) + list(loss_fn.parameters()), lr=1e-3, weight_decay=1e-4 ) train_loader = DataLoader( TensorDataset(torch.FloatTensor(X_train), torch.FloatTensor(y_train)), batch_size=128, shuffle=True ) val_loader = DataLoader( TensorDataset(torch.FloatTensor(X_val), torch.FloatTensor(y_val)), batch_size=128, shuffle=False ) # Train 5 epochs and check loss decreases train_losses = [] val_losses = [] val_accs = [] for epoch in range(5): # Train model.train() epoch_loss = 0 n_batch = 0 for xb, yb in train_loader: xb, yb = xb.to(device), yb.to(device) preds = model(xb) dirs = torch.stack([yb[:, i*2] for i in range(3)], dim=1) rets = torch.stack([yb[:, i*2+1] for i in range(3)], dim=1) loss_dict = loss_fn(preds, {'direction': dirs, 'returns': rets}) optimizer.zero_grad() loss_dict['total_loss'].backward() torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) optimizer.step() epoch_loss += loss_dict['total_loss'].item() n_batch += 1 train_losses.append(epoch_loss / n_batch) # Validate model.eval() v_loss = 0 v_batches = 0 correct = 0 total = 0 with torch.no_grad(): for xb, yb in val_loader: xb, yb = xb.to(device), yb.to(device) preds = model(xb) dirs = torch.stack([yb[:, i*2] for i in range(3)], dim=1) rets = torch.stack([yb[:, i*2+1] for i in range(3)], dim=1) loss_dict = loss_fn(preds, {'direction': dirs, 'returns': rets}) v_loss += loss_dict['total_loss'].item() v_batches += 1 dir_preds = (torch.sigmoid(preds['direction_logits']) > 0.5).float() correct += (dir_preds[:, 0] == dirs[:, 0]).sum().item() total += len(xb) val_losses.append(v_loss / v_batches) val_accs.append(correct / total) print(f" Epoch {epoch+1}: train_loss={train_losses[-1]:.4f} val_loss={val_losses[-1]:.4f} DA-1d={val_accs[-1]:.1%}") test("Training runs without error", True) test("Train loss decreases", train_losses[-1] < train_losses[0], f"{train_losses[0]:.4f} → {train_losses[-1]:.4f}") test("Val loss decreases", val_losses[-1] < val_losses[0], f"{val_losses[0]:.4f} → {val_losses[-1]:.4f}") test("Direction accuracy > random (40%)", val_accs[-1] > 0.40, f"{val_accs[-1]:.1%}") test("No NaN in losses", all(np.isfinite(l) for l in train_losses + val_losses)) # Save and reload os.makedirs('/app/models', exist_ok=True) save_path = '/app/models/test_model.pt' torch.save({ 'model_state': model.state_dict(), 'config': {'num_channels': num_channels, 'd_model': 64, 'n_heads': 4, 'n_layers': 2, 'd_ff': 128, 'patch_len': 6, 'stride': 3} }, save_path) test("Model saves", os.path.exists(save_path)) checkpoint = torch.load(save_path, map_location='cpu', weights_only=False) model2 = TradingTransformer( num_channels=num_channels, seq_len=30, patch_len=6, stride=3, d_model=64, n_heads=4, n_layers=2, d_ff=128, num_horizons=3 ) model2.load_state_dict(checkpoint['model_state']) model2.eval() with torch.no_grad(): out1 = model(torch.FloatTensor(X[:4])) out2 = model2(torch.FloatTensor(X[:4])) test("Saved/loaded model produces same output", torch.allclose(out1['direction_logits'], out2['direction_logits'], atol=1e-5)) print(f"\n 📈 Training verified: loss {train_losses[0]:.4f} → {train_losses[-1]:.4f}") except Exception as e: test("Training loop", False, f"EXCEPTION: {e}") traceback.print_exc() # ═══════════════════════════════════════════════════════ # TEST 4: RISK MODEL # ═══════════════════════════════════════════════════════ section("TEST 4: RISK MODEL") try: from trading_intelligence.risk_model import ( PortfolioEncoder, TraderBehaviorAnalyzer, RiskModel, RiskLoss ) B = 4 # Test PortfolioEncoder pe = PortfolioEncoder(position_dim=8, max_positions=5, d_model=64) positions = torch.randn(B, 5, 8) account = torch.randn(B, 6) mask = torch.ones(B, 5, dtype=torch.bool) mask[:, 3:] = False port_repr = pe(positions, account, mask) test("PortfolioEncoder forward", port_repr.shape == (B, 64)) test("PortfolioEncoder no NaN", torch.isfinite(port_repr).all()) # Test TraderBehaviorAnalyzer tba = TraderBehaviorAnalyzer(trade_dim=12, d_model=64, n_layers=2) trade_hist = torch.randn(B, 20, 12) behavior = tba(trade_hist) test("BehaviorAnalyzer returns dict", isinstance(behavior, dict)) test("risk_appetite shape", behavior['risk_appetite'].shape == (B,)) test("risk_appetite in [0,1]", (behavior['risk_appetite'] >= 0).all() and (behavior['risk_appetite'] <= 1).all()) test("overtrading_prob in [0,1]", (behavior['overtrading_prob'] >= 0).all() and (behavior['overtrading_prob'] <= 1).all()) test("revenge_trading_prob in [0,1]", (behavior['revenge_trading_prob'] >= 0).all() and (behavior['revenge_trading_prob'] <= 1).all()) test("trader_type_logits shape", behavior['trader_type_logits'].shape == (B, 5)) test("behavior_embedding shape", behavior['behavior_embedding'].shape == (B, 64)) # Test Full RiskModel rm = RiskModel(market_dim=64, portfolio_dim=64, behavior_dim=64) rm.eval() market_state = torch.randn(B, 64) with torch.no_grad(): risk_out = rm(market_state, positions, account, trade_hist, mask) test("RiskModel returns dict", isinstance(risk_out, dict)) test("risk_score shape", risk_out['risk_score'].shape == (B,)) test("risk_score in [0,1]", (risk_out['risk_score'] >= 0).all() and (risk_out['risk_score'] <= 1).all()) test("adjusted_position_size in [0,1]", (risk_out['adjusted_position_size'] >= 0).all() and (risk_out['adjusted_position_size'] <= 1).all()) test("stop_loss_atr_mult >= 0", (risk_out['stop_loss_atr_mult'] >= 0).all()) test("take_profit_atr_mult >= 0", (risk_out['take_profit_atr_mult'] >= 0).all()) test("drawdown_probs shape", risk_out['drawdown_probs'].shape == (B, 4)) test("drawdown_probs in [0,1]", (risk_out['drawdown_probs'] >= 0).all() and (risk_out['drawdown_probs'] <= 1).all()) test("var_estimates shape", risk_out['var_estimates'].shape == (B, 3)) test("behavior_profile in output", 'behavior_profile' in risk_out) # Test RiskLoss rl = RiskLoss() targets = { 'actual_risk': torch.rand(B), 'optimal_position_size': torch.rand(B), 'drawdown_occurred': torch.rand(B, 4), } risk_losses = rl(risk_out, targets) test("RiskLoss computes", 'total_loss' in risk_losses) test("RiskLoss finite", torch.isfinite(risk_losses['total_loss'])) print(f"\n 🛡️ Risk model: all outputs verified, shapes correct") except Exception as e: test("Risk model module", False, f"EXCEPTION: {e}") traceback.print_exc() # ═══════════════════════════════════════════════════════ # TEST 5: PERSONALIZATION # ═══════════════════════════════════════════════════════ section("TEST 5: PERSONALIZATION") try: from trading_intelligence.personalization import ( TraderProfiler, BehaviorAlertSystem, PersonalizationEngine, TRADER_TYPES ) test("5 trader types defined", len(TRADER_TYPES) == 5) profiler = TraderProfiler() # Test with conservative trader conservative_trades = [ {'entry_price': 100, 'exit_price': 101, 'size': 0.01, 'pnl': 10, 'holding_time': 2880, 'direction': 1} ] * 20 + [ {'entry_price': 100, 'exit_price': 99.5, 'size': 0.01, 'pnl': -5, 'holding_time': 1440, 'direction': 1} ] * 8 feats = profiler.extract_behavior_features(conservative_trades) test("Feature extraction returns array", isinstance(feats, np.ndarray)) test("15 behavior features", len(feats) == 15, f"got {len(feats)}") test("Win rate correct", abs(feats[0] - 20/28) < 0.01, f"got {feats[0]:.3f}") profile = profiler.predict_type(feats) test("Profile returns dict", isinstance(profile, dict)) test("Profile has cluster", 'cluster' in profile) test("Profile has type_name", 'type_name' in profile) test("Conservative classified as Swing/Conservative", profile['type_name'] in ['Swing Trader', 'Conservative']) print(f" Conservative Carol → {profile['type_name']}") # Test aggressive trader aggressive_trades = [ {'entry_price': 100, 'exit_price': 105, 'size': 0.15, 'pnl': 750, 'holding_time': 60, 'direction': 1} ] * 12 + [ {'entry_price': 100, 'exit_price': 93, 'size': 0.20, 'pnl': -1400, 'holding_time': 30, 'direction': 1} ] * 10 agg_feats = profiler.extract_behavior_features(aggressive_trades) agg_profile = profiler.predict_type(agg_feats) test("Aggressive classified correctly", agg_profile['type_name'] in ['Aggressive', 'Moderate']) print(f" Aggressive Alex → {agg_profile['type_name']}") # Test scalper scalper_trades = [ {'entry_price': 100, 'exit_price': 100.1, 'size': 0.03, 'pnl': 3, 'holding_time': 2, 'direction': 1} ] * 80 + [ {'entry_price': 100, 'exit_price': 99.95, 'size': 0.03, 'pnl': -1.5, 'holding_time': 1, 'direction': -1} ] * 50 scalp_feats = profiler.extract_behavior_features(scalper_trades) scalp_profile = profiler.predict_type(scalp_feats) test("Scalper classified correctly", scalp_profile['type_name'] == 'Scalper') print(f" Scalper Sam → {scalp_profile['type_name']}") # Test empty trades empty_feats = profiler.extract_behavior_features([]) test("Empty trades returns zeros", np.all(empty_feats == 0)) # Test BehaviorAlertSystem alert_system = BehaviorAlertSystem() # Normal situation normal_alerts = alert_system.analyze(conservative_trades[-5:], 100000, 2.0) test("Normal status", normal_alerts['status'] in ['normal', 'warning']) # Overtrading scenario (10+ trades in 1 hour) many_trades = [{'entry_price': 100, 'exit_price': 100.1, 'size': 0.01, 'pnl': 1, 'holding_time': 1, 'direction': 1}] * 15 over_alerts = alert_system.analyze(many_trades, 100000, 1.0) test("Overtrading detected", any(a['type'] == 'OVERTRADING' for a in over_alerts['alerts']), f"alerts: {[a['type'] for a in over_alerts['alerts']]}") # Loss streak scenario loss_trades = [{'entry_price': 100, 'exit_price': 99, 'size': 0.05, 'pnl': -50, 'holding_time': 60, 'direction': 1}] * 5 loss_alerts = alert_system.analyze(loss_trades, 100000, 1.0) test("Loss streak detected", any(a['type'] == 'LOSS_STREAK' for a in loss_alerts['alerts'])) # Excessive drawdown big_loss = [{'entry_price': 100, 'exit_price': 80, 'size': 0.2, 'pnl': -20000, 'holding_time': 30, 'direction': 1}] * 3 dd_alerts = alert_system.analyze(big_loss, 100000, 1.0) test("Excessive drawdown detected", any(a['type'] == 'EXCESSIVE_DRAWDOWN' for a in dd_alerts['alerts'])) test("Critical status on drawdown", dd_alerts['status'] == 'critical') # Test PersonalizationEngine engine = PersonalizationEngine() params = engine.get_personalized_params( {'cluster': 0, 'type_name': 'Conservative'}, {'alerts': [], 'risk_multiplier': 1.0, 'status': 'normal'} ) test("Personalization returns params", isinstance(params, dict)) test("Conservative max_position <= 2%", params['max_position_pct'] <= 0.02) test("Conservative min_confidence >= 70%", params['min_confidence'] >= 0.7) # With revenge trading alert revenge_params = engine.get_personalized_params( {'cluster': 2, 'type_name': 'Aggressive'}, {'alerts': [{'type': 'REVENGE_TRADING', 'severity': 'CRITICAL', 'message': 'test'}], 'risk_multiplier': 0.3, 'status': 'critical'} ) test("Revenge trading increases min_confidence", revenge_params['min_confidence'] > 0.55) test("Risk multiplier reduces position size", revenge_params['max_position_pct'] < 0.10) print(f"\n 👤 Personalization: all trader types, alerts, and adaptations verified") except Exception as e: test("Personalization module", False, f"EXCEPTION: {e}") traceback.print_exc() # ═══════════════════════════════════════════════════════ # TEST 6: DECISION ENGINE # ═══════════════════════════════════════════════════════ section("TEST 6: DECISION ENGINE") try: from trading_intelligence.decision_engine import ( DecisionEngine, Signal, TradingDecision, format_decision ) engine = DecisionEngine( prediction_model=model, personalization_engine=PersonalizationEngine(), ) test_features = np.random.randn(1, num_channels, 30).astype(np.float32) # Basic decision decision = engine.make_decision( market_features=test_features, trader_profile={'cluster': 1, 'type_name': 'Moderate'}, behavior_alerts={'alerts': [], 'risk_multiplier': 1.0, 'status': 'normal'}, current_atr=0.015, horizon_idx=0, ) test("Decision is TradingDecision", isinstance(decision, TradingDecision)) test("Signal is valid enum", isinstance(decision.signal, Signal)) test("Confidence in [0,1]", 0 <= decision.confidence <= 1, f"{decision.confidence:.3f}") test("Direction prob in [0,1]", 0 <= decision.direction_prob <= 1) test("Risk score in [0,1]", 0 <= decision.risk_score <= 1) test("Position size > 0", decision.position_size_pct > 0) test("Stop loss > 0", decision.stop_loss_pct > 0) test("Take profit > 0", decision.take_profit_pct > 0) test("Has reasoning", len(decision.reasoning) > 0) test("Has horizon label", decision.horizon in ['short_term', 'mid_term', 'long_term']) print(f" Decision: {decision.signal.value} (conf={decision.confidence:.1%})") # Multi-horizon decisions decisions = engine.make_multi_horizon_decisions( market_features=test_features, trader_profile={'cluster': 1, 'type_name': 'Moderate'}, behavior_alerts={'alerts': [], 'risk_multiplier': 1.0, 'status': 'normal'}, current_atr=0.015, ) test("3 horizon decisions", len(decisions) == 3) test("Horizons are different", len(set(d.horizon for d in decisions)) == 3, f"{[d.horizon for d in decisions]}") for d in decisions: print(f" {d.horizon}: {d.signal.value} (conf={d.confidence:.1%}, dir={d.direction_prob:.1%})") # Critical alert override critical_decision = engine.make_decision( market_features=test_features, trader_profile={'cluster': 2, 'type_name': 'Aggressive'}, behavior_alerts={ 'alerts': [{'type': 'REVENGE_TRADING', 'severity': 'CRITICAL', 'message': 'test'}], 'risk_multiplier': 0.3, 'status': 'critical' }, current_atr=0.015, horizon_idx=0, ) test("Critical alert forces HOLD", critical_decision.signal == Signal.HOLD) test("Alert in reasoning", any('CRITICAL' in r for r in critical_decision.reasoning)) # Test format_decision formatted = format_decision(decision) test("format_decision returns string", isinstance(formatted, str)) test("format_decision has signal", decision.signal.value in formatted) test("format_decision has confidence", 'Confidence' in formatted) # Test without model (defaults) engine_no_model = DecisionEngine() default_decision = engine_no_model.make_decision( market_features=test_features, current_atr=0.015, ) test("Works without model (defaults)", isinstance(default_decision, TradingDecision)) print(f"\n 🎯 Decision engine: all signal types, alerts, formatting verified") except Exception as e: test("Decision engine module", False, f"EXCEPTION: {e}") traceback.print_exc() # ═══════════════════════════════════════════════════════ # TEST 7: EVALUATION & BACKTESTING # ═══════════════════════════════════════════════════════ section("TEST 7: EVALUATION & BACKTESTING") try: from trading_intelligence.evaluation import Evaluator, format_evaluation evaluator = Evaluator(prediction_horizons=[1, 5, 20], trading_costs=0.001) test_loader = DataLoader( TensorDataset(torch.FloatTensor(X_test), torch.FloatTensor(y_test)), batch_size=128, shuffle=False ) eval_results = evaluator.evaluate_predictions(model, test_loader, device) test("Evaluation returns dict", isinstance(eval_results, dict)) test("Has summary", 'summary' in eval_results) test("Has horizon_1", 'horizon_1' in eval_results) test("Has horizon_5", 'horizon_5' in eval_results) test("Has horizon_20", 'horizon_20' in eval_results) summary = eval_results['summary'] test("num_test_samples > 0", summary['num_test_samples'] > 0) test("avg_direction_accuracy in [0,1]", 0 <= summary['avg_direction_accuracy'] <= 1) test("avg_ic is finite", np.isfinite(summary['avg_ic'])) for h in [1, 5, 20]: hr = eval_results[f'horizon_{h}'] test(f"H{h} direction_accuracy in [0,1]", 0 <= hr['direction_accuracy'] <= 1) test(f"H{h} sharpe_ratio is finite", np.isfinite(hr['sharpe_ratio'])) test(f"H{h} max_drawdown in [0,1]", 0 <= hr['max_drawdown'] <= 1) test(f"H{h} profit_factor >= 0", hr['profit_factor'] >= 0) test(f"H{h} win_rate in [0,1]", 0 <= hr['win_rate'] <= 1) test(f"H{h} num_trades > 0", hr['num_trades'] > 0) print(f" H{h}: DA={hr['direction_accuracy']:.1%} IC={hr['information_coefficient']:.4f} " f"Sharpe={hr['sharpe_ratio']:.2f} DD={hr['max_drawdown']:.1%} PF={hr['profit_factor']:.2f}") # Test format_evaluation formatted = format_evaluation(eval_results) test("format_evaluation returns string", isinstance(formatted, str)) test("format_evaluation has content", len(formatted) > 100) print(f"\n 📊 Evaluation: all metrics computed and validated") except Exception as e: test("Evaluation module", False, f"EXCEPTION: {e}") traceback.print_exc() # ═══════════════════════════════════════════════════════ # TEST 8: TRAINING PIPELINE (high-level API) # ═══════════════════════════════════════════════════════ section("TEST 8: TRAINING PIPELINE (high-level)") try: from trading_intelligence.training import TrainingPipeline, FinancialTimeSeriesDataset pipeline = TrainingPipeline( lookback_window=30, prediction_horizons=[1, 5, 20], d_model=64, n_heads=4, n_layers=2, d_ff=128, patch_len=6, stride=3, dropout=0.1, learning_rate=1e-3, batch_size=128, max_epochs=3, patience=2, ) train_loader, val_loader, test_loader = pipeline.prepare_data(df) test("Pipeline prepare_data", True) test("Pipeline model initialized", pipeline.model is not None) test("Pipeline loss_fn initialized", pipeline.loss_fn is not None) results = pipeline.train(train_loader, val_loader) test("Pipeline train completes", 'best_val_loss' in results) test("Pipeline training history", len(results['history']) > 0) # Save/load cycle pipeline.save_model('/app/models/pipeline_model.pt') test("Pipeline model saved", os.path.exists('/app/models/pipeline_model.pt')) pipeline2 = TrainingPipeline(lookback_window=30, prediction_horizons=[1,5,20]) pipeline2.load_model('/app/models/pipeline_model.pt') test("Pipeline model loaded", pipeline2.model is not None) print(f"\n 🔧 Training pipeline: prepare, train, save, load all verified") except Exception as e: test("Training pipeline", False, f"EXCEPTION: {e}") traceback.print_exc() # ═══════════════════════════════════════════════════════ # FULL INTEGRATION TEST # ═══════════════════════════════════════════════════════ section("INTEGRATION TEST: FULL PIPELINE") try: print(" Running complete end-to-end flow...") # 1. Raw data → Features fe = FeatureEngine(lookback_window=30, prediction_horizons=[1, 5, 20]) features = fe.compute_all_features(df) features_norm, _ = fe.normalize_features(features) # 2. Features → Sequences target_cols = [] for h in [1, 5, 20]: target_cols.extend([f'target_direction_{h}', f'target_return_{h}']) X_all, y_all = fe.create_sequences(features_norm, target_cols=target_cols) valid = np.isfinite(X_all).all(axis=(1, 2)) & np.isfinite(y_all).all(axis=1) X_all, y_all = X_all[valid], y_all[valid] # 3. Train model model_final = TradingTransformer( num_channels=X_all.shape[1], seq_len=30, patch_len=6, stride=3, d_model=64, n_heads=4, n_layers=2, d_ff=128, num_horizons=3 ) # 4. Get prediction model_final.eval() with torch.no_grad(): sample = torch.FloatTensor(X_all[-1:]) pred = model_final.predict_with_confidence(sample) # 5. Risk assessment rm = RiskModel(market_dim=64, portfolio_dim=64, behavior_dim=64) rm.eval() # 6. Personalization profiler = TraderProfiler() alert_system = BehaviorAlertSystem() pers = PersonalizationEngine() sample_trades = [ {'entry_price': 100, 'exit_price': 101.5, 'size': 0.05, 'pnl': 75, 'holding_time': 120, 'direction': 1} ] * 15 + [ {'entry_price': 100, 'exit_price': 99, 'size': 0.05, 'pnl': -50, 'holding_time': 60, 'direction': -1} ] * 8 trader_feats = profiler.extract_behavior_features(sample_trades) trader_profile = profiler.predict_type(trader_feats) alerts = alert_system.analyze(sample_trades[-5:], 100000, 1.0) # 7. Decision decision_engine = DecisionEngine( prediction_model=model_final, personalization_engine=pers, ) final_decision = decision_engine.make_decision( market_features=X_all[-1:], trader_profile=trader_profile, behavior_alerts=alerts, current_atr=0.015, horizon_idx=1, ) test("Integration: features computed", len(features) > 0) test("Integration: sequences created", X_all.shape[0] > 0) test("Integration: prediction made", pred['direction_probs'].shape == (1, 3)) test("Integration: trader profiled", trader_profile['type_name'] in TRADER_TYPES.values()) test("Integration: decision generated", isinstance(final_decision.signal, Signal)) print(f"\n Full pipeline output:") print(format_decision(final_decision)) test("Integration: complete pipeline works", True) except Exception as e: test("Integration test", False, f"EXCEPTION: {e}") traceback.print_exc() # ═══════════════════════════════════════════════════════ # FINAL SUMMARY # ═══════════════════════════════════════════════════════ section("TEST SUMMARY") total = PASS + FAIL print(f"\n ✅ Passed: {PASS}/{total}") print(f" ❌ Failed: {FAIL}/{total}") print(f" Pass Rate: {PASS/total*100:.1f}%") if FAIL == 0: print(f"\n 🎉 ALL TESTS PASSED!") else: print(f"\n ⚠️ {FAIL} test(s) need attention") print(f"\n{'='*70}")