| |
| """ |
| Comprehensive Test Suite for Trading Intelligence System |
| ========================================================= |
| Tests each module independently, then runs full integration. |
| Every assertion is checked, every output is validated. |
| """ |
| import sys, os, time, traceback, warnings |
| warnings.filterwarnings('ignore') |
| sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', buffering=1) |
| sys.path.insert(0, '/app') |
|
|
| import numpy as np |
| import pandas as pd |
| import torch |
|
|
| PASS = 0 |
| FAIL = 0 |
|
|
| def test(name, condition, detail=""): |
| global PASS, FAIL |
| if condition: |
| PASS += 1 |
| print(f" β
{name}") |
| else: |
| FAIL += 1 |
| print(f" β {name} β {detail}") |
|
|
| def section(title): |
| print(f"\n{'='*70}") |
| print(f" {title}") |
| print(f"{'='*70}") |
|
|
| |
| |
| |
| section("DATA GENERATION") |
| np.random.seed(42) |
| num_days = 1500 |
| dt = 1/252 |
| prices = [150.0] |
| vol = 0.20 |
|
|
| for i in range(num_days - 1): |
| vol = vol + 0.1 * (0.20 - vol) * dt + 0.3 * np.sqrt(dt) * np.random.normal() |
| vol = max(vol, 0.05) |
| ret = (0.08 - 0.5 * vol**2) * dt + vol * np.sqrt(dt) * np.random.normal() |
| prices.append(prices[-1] * np.exp(ret)) |
|
|
| df = pd.DataFrame({ |
| 'date': pd.date_range('2019-01-02', periods=num_days, freq='B')[:num_days], |
| 'open': [p * (1 + np.random.normal(0, 0.002)) for p in prices], |
| 'high': [p * (1 + abs(np.random.normal(0, 0.01))) for p in prices], |
| 'low': [p * (1 - abs(np.random.normal(0, 0.01))) for p in prices], |
| 'close': prices, |
| 'volume': [int(1e6 * np.exp(np.random.normal(0, 0.3))) for _ in range(num_days)], |
| }) |
| df['high'] = df[['open', 'high', 'close']].max(axis=1) * (1 + abs(np.random.normal(0, 0.002, num_days))) |
| df['low'] = df[['open', 'low', 'close']].min(axis=1) * (1 - abs(np.random.normal(0, 0.002, num_days))) |
|
|
| test("Data generated", len(df) == num_days, f"got {len(df)}") |
| test("OHLCV columns present", all(c in df.columns for c in ['open','high','low','close','volume'])) |
| test("No NaN in raw data", df[['open','high','low','close','volume']].isna().sum().sum() == 0) |
| test("High >= Low", (df['high'] >= df['low']).all()) |
| test("High >= Close", (df['high'] >= df['close']).all()) |
| test("Low <= Close", (df['low'] <= df['close']).all()) |
| print(f" Price range: ${min(prices):.2f} β ${max(prices):.2f}") |
|
|
| |
| |
| |
| section("TEST 1: FEATURE ENGINE") |
| try: |
| from trading_intelligence.feature_engine import FeatureEngine, SentimentFeatureEngine |
| |
| fe = FeatureEngine(lookback_window=30, prediction_horizons=[1, 5, 20]) |
| features = fe.compute_all_features(df) |
| |
| test("Feature engine initializes", True) |
| test("Features computed", len(features) > 0, f"got {len(features)} rows") |
| test("No NaN after dropna", features[fe.feature_names].isna().sum().sum() == 0) |
| test("Feature count >= 60", len(fe.feature_names) >= 60, f"got {len(fe.feature_names)}") |
| |
| |
| price_feats = [f for f in fe.feature_names if 'return' in f or 'momentum' in f or 'body' in f] |
| tech_feats = [f for f in fe.feature_names if 'rsi' in f or 'macd' in f or 'ema' in f or 'bb_' in f] |
| vol_feats = [f for f in fe.feature_names if 'vol' in f.lower() and 'obv' not in f] |
| regime_feats = [f for f in fe.feature_names if 'regime' in f or 'trend' in f or 'hurst' in f] |
| |
| test("Price features exist", len(price_feats) > 5, f"got {len(price_feats)}") |
| test("Technical indicators exist", len(tech_feats) > 10, f"got {len(tech_feats)}") |
| test("Volatility features exist", len(vol_feats) > 5, f"got {len(vol_feats)}") |
| test("Regime features exist", len(regime_feats) > 2, f"got {len(regime_feats)}") |
| |
| |
| for h in [1, 5, 20]: |
| test(f"Target direction_{h} exists", f'target_direction_{h}' in features.columns) |
| test(f"Target return_{h} exists", f'target_return_{h}' in features.columns) |
| vals = features[f'target_direction_{h}'].dropna().unique() |
| test(f"Direction_{h} is binary", set(vals).issubset({0.0, 1.0}), f"got {vals[:5]}") |
| |
| |
| features_norm, norm_params = fe.normalize_features(features) |
| test("Normalization produces params", len(norm_params) > 0) |
| test("Normalized features have similar scale", |
| abs(features_norm[fe.feature_names].mean().mean()) < 1.0, |
| f"mean={features_norm[fe.feature_names].mean().mean():.4f}") |
| |
| |
| target_cols = [] |
| for h in [1, 5, 20]: |
| target_cols.extend([f'target_direction_{h}', f'target_return_{h}']) |
| |
| X, y = fe.create_sequences(features_norm, target_cols=target_cols) |
| valid = np.isfinite(X).all(axis=(1, 2)) & np.isfinite(y).all(axis=1) |
| X, y = X[valid], y[valid] |
| |
| test("Sequences created", X.shape[0] > 0) |
| test("X shape correct (N, C, L)", len(X.shape) == 3) |
| test("X channels match features", X.shape[1] == len(fe.feature_names), |
| f"{X.shape[1]} vs {len(fe.feature_names)}") |
| test("X lookback correct", X.shape[2] == 30, f"got {X.shape[2]}") |
| test("Y targets = 6 (3 horizons Γ 2)", y.shape[1] == 6, f"got {y.shape[1]}") |
| test("No NaN/Inf in X", np.isfinite(X).all()) |
| test("No NaN/Inf in y", np.isfinite(y).all()) |
| |
| |
| se = SentimentFeatureEngine() |
| score = se.compute_rule_based_sentiment("Stock upgraded, strong growth expected, bullish outlook") |
| test("Sentiment positive for bullish text", score > 0, f"score={score:.3f}") |
| score_neg = se.compute_rule_based_sentiment("Stock crashed, massive loss, bearish outlook") |
| test("Sentiment negative for bearish text", score_neg < 0, f"score={score_neg:.3f}") |
| |
| print(f"\n π Feature Summary: {len(fe.feature_names)} features, {X.shape[0]} samples") |
| print(f" Feature names: {fe.feature_names[:10]}...") |
|
|
| except Exception as e: |
| test("Feature engine module", False, f"EXCEPTION: {e}") |
| traceback.print_exc() |
|
|
| |
| |
| |
| section("TEST 2: PREDICTION MODEL") |
| try: |
| from trading_intelligence.prediction_model import ( |
| PatchEmbedding, PositionalEncoding, MultiHeadAttention, |
| TransformerBlock, ChannelMixer, PredictionHead, |
| TradingTransformer, MultiTaskLoss |
| ) |
| |
| device = torch.device('cpu') |
| num_channels = X.shape[1] |
| batch_size = 16 |
| |
| |
| pe = PatchEmbedding(patch_len=6, stride=3, d_model=64) |
| test_input = torch.randn(batch_size, num_channels, 30) |
| patches = pe(test_input) |
| test("PatchEmbedding forward", patches.shape[0] == batch_size) |
| test("PatchEmbedding output 4D", len(patches.shape) == 4) |
| test("PatchEmbedding d_model=64", patches.shape[-1] == 64) |
| |
| |
| pos = PositionalEncoding(d_model=64) |
| pos_out = pos(patches) |
| test("PositionalEncoding same shape", pos_out.shape == patches.shape) |
| |
| |
| mha = MultiHeadAttention(d_model=64, n_heads=4) |
| attn_input = torch.randn(batch_size, 10, 64) |
| attn_out = mha(attn_input) |
| test("MHA forward", attn_out.shape == attn_input.shape) |
| |
| |
| tb = TransformerBlock(d_model=64, n_heads=4, d_ff=128) |
| tb_out = tb(attn_input) |
| test("TransformerBlock forward", tb_out.shape == attn_input.shape) |
| |
| |
| cm = ChannelMixer(num_channels=num_channels, d_model=64, n_heads=4) |
| cm_out = cm(patches) |
| test("ChannelMixer forward", cm_out.shape == patches.shape) |
| |
| |
| ph = PredictionHead(d_model=64, num_horizons=3) |
| ph_input = torch.randn(batch_size, 64) |
| ph_out = ph(ph_input) |
| test("PredictionHead returns dict", isinstance(ph_out, dict)) |
| test("PredictionHead has direction_logits", 'direction_logits' in ph_out) |
| test("PredictionHead has expected_return", 'expected_return' in ph_out) |
| test("PredictionHead has log_variance", 'log_variance' in ph_out) |
| test("Direction shape (B, 3)", ph_out['direction_logits'].shape == (batch_size, 3)) |
| |
| |
| model = TradingTransformer( |
| num_channels=num_channels, seq_len=30, patch_len=6, stride=3, |
| d_model=64, n_heads=4, n_layers=2, d_ff=128, |
| num_horizons=3, dropout=0.1, |
| ).to(device) |
| |
| param_count = sum(p.numel() for p in model.parameters()) |
| test("Model instantiates", True) |
| test("Model has parameters", param_count > 0, f"{param_count:,} params") |
| |
| x_batch = torch.FloatTensor(X[:batch_size]).to(device) |
| output = model(x_batch) |
| test("Model forward pass", isinstance(output, dict)) |
| test("Output direction_logits shape", output['direction_logits'].shape == (batch_size, 3)) |
| test("Output expected_return shape", output['expected_return'].shape == (batch_size, 3)) |
| test("Output log_variance shape", output['log_variance'].shape == (batch_size, 3)) |
| test("No NaN in output", all(torch.isfinite(v).all().item() for v in output.values())) |
| |
| |
| preds = model.predict_with_confidence(x_batch) |
| test("predict_with_confidence returns dict", isinstance(preds, dict)) |
| test("direction_probs in [0,1]", (preds['direction_probs'] >= 0).all() and (preds['direction_probs'] <= 1).all()) |
| test("confidence in [0,1]", (preds['confidence'] >= 0).all() and (preds['confidence'] <= 1).all()) |
| |
| |
| loss_fn = MultiTaskLoss(num_horizons=3).to(device) |
| y_batch = torch.FloatTensor(y[:batch_size]).to(device) |
| directions = torch.stack([y_batch[:, i*2] for i in range(3)], dim=1) |
| returns = torch.stack([y_batch[:, i*2+1] for i in range(3)], dim=1) |
| targets = {'direction': directions, 'returns': returns} |
| |
| losses = loss_fn(output, targets) |
| test("Loss computes", isinstance(losses, dict)) |
| test("total_loss is scalar", losses['total_loss'].dim() == 0) |
| test("total_loss is finite", torch.isfinite(losses['total_loss']).item()) |
| test("direction_loss exists", 'direction_loss' in losses) |
| test("return_loss exists", 'return_loss' in losses) |
| test("risk_loss exists", 'risk_loss' in losses) |
| |
| |
| losses['total_loss'].backward() |
| grads_ok = all(p.grad is not None and torch.isfinite(p.grad).all() for p in model.parameters() if p.requires_grad) |
| test("Backward pass - gradients computed", grads_ok) |
| |
| print(f"\n π§ Model: {param_count:,} params, output verified across all heads") |
|
|
| except Exception as e: |
| test("Prediction model module", False, f"EXCEPTION: {e}") |
| traceback.print_exc() |
|
|
| |
| |
| |
| section("TEST 3: TRAINING LOOP") |
| try: |
| from torch.utils.data import TensorDataset, DataLoader |
| |
| |
| n = len(X) |
| train_end = int(n * 0.7) |
| val_end = int(n * 0.85) |
| X_train, y_train = X[:train_end], y[:train_end] |
| X_val, y_val = X[train_end:val_end], y[train_end:val_end] |
| X_test, y_test = X[val_end:], y[val_end:] |
| |
| test("Train set size > 0", len(X_train) > 0, f"{len(X_train)}") |
| test("Val set size > 0", len(X_val) > 0, f"{len(X_val)}") |
| test("Test set size > 0", len(X_test) > 0, f"{len(X_test)}") |
| |
| |
| model = TradingTransformer( |
| num_channels=num_channels, seq_len=30, patch_len=6, stride=3, |
| d_model=64, n_heads=4, n_layers=2, d_ff=128, |
| num_horizons=3, dropout=0.1, |
| ).to(device) |
| loss_fn = MultiTaskLoss(num_horizons=3).to(device) |
| optimizer = torch.optim.AdamW( |
| list(model.parameters()) + list(loss_fn.parameters()), |
| lr=1e-3, weight_decay=1e-4 |
| ) |
| |
| train_loader = DataLoader( |
| TensorDataset(torch.FloatTensor(X_train), torch.FloatTensor(y_train)), |
| batch_size=128, shuffle=True |
| ) |
| val_loader = DataLoader( |
| TensorDataset(torch.FloatTensor(X_val), torch.FloatTensor(y_val)), |
| batch_size=128, shuffle=False |
| ) |
| |
| |
| train_losses = [] |
| val_losses = [] |
| val_accs = [] |
| |
| for epoch in range(5): |
| |
| model.train() |
| epoch_loss = 0 |
| n_batch = 0 |
| for xb, yb in train_loader: |
| xb, yb = xb.to(device), yb.to(device) |
| preds = model(xb) |
| dirs = torch.stack([yb[:, i*2] for i in range(3)], dim=1) |
| rets = torch.stack([yb[:, i*2+1] for i in range(3)], dim=1) |
| loss_dict = loss_fn(preds, {'direction': dirs, 'returns': rets}) |
| optimizer.zero_grad() |
| loss_dict['total_loss'].backward() |
| torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) |
| optimizer.step() |
| epoch_loss += loss_dict['total_loss'].item() |
| n_batch += 1 |
| train_losses.append(epoch_loss / n_batch) |
| |
| |
| model.eval() |
| v_loss = 0 |
| v_batches = 0 |
| correct = 0 |
| total = 0 |
| with torch.no_grad(): |
| for xb, yb in val_loader: |
| xb, yb = xb.to(device), yb.to(device) |
| preds = model(xb) |
| dirs = torch.stack([yb[:, i*2] for i in range(3)], dim=1) |
| rets = torch.stack([yb[:, i*2+1] for i in range(3)], dim=1) |
| loss_dict = loss_fn(preds, {'direction': dirs, 'returns': rets}) |
| v_loss += loss_dict['total_loss'].item() |
| v_batches += 1 |
| dir_preds = (torch.sigmoid(preds['direction_logits']) > 0.5).float() |
| correct += (dir_preds[:, 0] == dirs[:, 0]).sum().item() |
| total += len(xb) |
| val_losses.append(v_loss / v_batches) |
| val_accs.append(correct / total) |
| |
| print(f" Epoch {epoch+1}: train_loss={train_losses[-1]:.4f} val_loss={val_losses[-1]:.4f} DA-1d={val_accs[-1]:.1%}") |
| |
| test("Training runs without error", True) |
| test("Train loss decreases", train_losses[-1] < train_losses[0], |
| f"{train_losses[0]:.4f} β {train_losses[-1]:.4f}") |
| test("Val loss decreases", val_losses[-1] < val_losses[0], |
| f"{val_losses[0]:.4f} β {val_losses[-1]:.4f}") |
| test("Direction accuracy > random (40%)", val_accs[-1] > 0.40, f"{val_accs[-1]:.1%}") |
| test("No NaN in losses", all(np.isfinite(l) for l in train_losses + val_losses)) |
| |
| |
| os.makedirs('/app/models', exist_ok=True) |
| save_path = '/app/models/test_model.pt' |
| torch.save({ |
| 'model_state': model.state_dict(), |
| 'config': {'num_channels': num_channels, 'd_model': 64, 'n_heads': 4, |
| 'n_layers': 2, 'd_ff': 128, 'patch_len': 6, 'stride': 3} |
| }, save_path) |
| test("Model saves", os.path.exists(save_path)) |
| |
| checkpoint = torch.load(save_path, map_location='cpu', weights_only=False) |
| model2 = TradingTransformer( |
| num_channels=num_channels, seq_len=30, patch_len=6, stride=3, |
| d_model=64, n_heads=4, n_layers=2, d_ff=128, num_horizons=3 |
| ) |
| model2.load_state_dict(checkpoint['model_state']) |
| model2.eval() |
| with torch.no_grad(): |
| out1 = model(torch.FloatTensor(X[:4])) |
| out2 = model2(torch.FloatTensor(X[:4])) |
| test("Saved/loaded model produces same output", |
| torch.allclose(out1['direction_logits'], out2['direction_logits'], atol=1e-5)) |
| |
| print(f"\n π Training verified: loss {train_losses[0]:.4f} β {train_losses[-1]:.4f}") |
|
|
| except Exception as e: |
| test("Training loop", False, f"EXCEPTION: {e}") |
| traceback.print_exc() |
|
|
| |
| |
| |
| section("TEST 4: RISK MODEL") |
| try: |
| from trading_intelligence.risk_model import ( |
| PortfolioEncoder, TraderBehaviorAnalyzer, RiskModel, RiskLoss |
| ) |
| |
| B = 4 |
| |
| |
| pe = PortfolioEncoder(position_dim=8, max_positions=5, d_model=64) |
| positions = torch.randn(B, 5, 8) |
| account = torch.randn(B, 6) |
| mask = torch.ones(B, 5, dtype=torch.bool) |
| mask[:, 3:] = False |
| |
| port_repr = pe(positions, account, mask) |
| test("PortfolioEncoder forward", port_repr.shape == (B, 64)) |
| test("PortfolioEncoder no NaN", torch.isfinite(port_repr).all()) |
| |
| |
| tba = TraderBehaviorAnalyzer(trade_dim=12, d_model=64, n_layers=2) |
| trade_hist = torch.randn(B, 20, 12) |
| behavior = tba(trade_hist) |
| test("BehaviorAnalyzer returns dict", isinstance(behavior, dict)) |
| test("risk_appetite shape", behavior['risk_appetite'].shape == (B,)) |
| test("risk_appetite in [0,1]", (behavior['risk_appetite'] >= 0).all() and (behavior['risk_appetite'] <= 1).all()) |
| test("overtrading_prob in [0,1]", (behavior['overtrading_prob'] >= 0).all() and (behavior['overtrading_prob'] <= 1).all()) |
| test("revenge_trading_prob in [0,1]", (behavior['revenge_trading_prob'] >= 0).all() and (behavior['revenge_trading_prob'] <= 1).all()) |
| test("trader_type_logits shape", behavior['trader_type_logits'].shape == (B, 5)) |
| test("behavior_embedding shape", behavior['behavior_embedding'].shape == (B, 64)) |
| |
| |
| rm = RiskModel(market_dim=64, portfolio_dim=64, behavior_dim=64) |
| rm.eval() |
| |
| market_state = torch.randn(B, 64) |
| with torch.no_grad(): |
| risk_out = rm(market_state, positions, account, trade_hist, mask) |
| |
| test("RiskModel returns dict", isinstance(risk_out, dict)) |
| test("risk_score shape", risk_out['risk_score'].shape == (B,)) |
| test("risk_score in [0,1]", (risk_out['risk_score'] >= 0).all() and (risk_out['risk_score'] <= 1).all()) |
| test("adjusted_position_size in [0,1]", (risk_out['adjusted_position_size'] >= 0).all() and (risk_out['adjusted_position_size'] <= 1).all()) |
| test("stop_loss_atr_mult >= 0", (risk_out['stop_loss_atr_mult'] >= 0).all()) |
| test("take_profit_atr_mult >= 0", (risk_out['take_profit_atr_mult'] >= 0).all()) |
| test("drawdown_probs shape", risk_out['drawdown_probs'].shape == (B, 4)) |
| test("drawdown_probs in [0,1]", (risk_out['drawdown_probs'] >= 0).all() and (risk_out['drawdown_probs'] <= 1).all()) |
| test("var_estimates shape", risk_out['var_estimates'].shape == (B, 3)) |
| test("behavior_profile in output", 'behavior_profile' in risk_out) |
| |
| |
| rl = RiskLoss() |
| targets = { |
| 'actual_risk': torch.rand(B), |
| 'optimal_position_size': torch.rand(B), |
| 'drawdown_occurred': torch.rand(B, 4), |
| } |
| risk_losses = rl(risk_out, targets) |
| test("RiskLoss computes", 'total_loss' in risk_losses) |
| test("RiskLoss finite", torch.isfinite(risk_losses['total_loss'])) |
| |
| print(f"\n π‘οΈ Risk model: all outputs verified, shapes correct") |
|
|
| except Exception as e: |
| test("Risk model module", False, f"EXCEPTION: {e}") |
| traceback.print_exc() |
|
|
| |
| |
| |
| section("TEST 5: PERSONALIZATION") |
| try: |
| from trading_intelligence.personalization import ( |
| TraderProfiler, BehaviorAlertSystem, PersonalizationEngine, TRADER_TYPES |
| ) |
| |
| test("5 trader types defined", len(TRADER_TYPES) == 5) |
| |
| profiler = TraderProfiler() |
| |
| |
| conservative_trades = [ |
| {'entry_price': 100, 'exit_price': 101, 'size': 0.01, 'pnl': 10, 'holding_time': 2880, 'direction': 1} |
| ] * 20 + [ |
| {'entry_price': 100, 'exit_price': 99.5, 'size': 0.01, 'pnl': -5, 'holding_time': 1440, 'direction': 1} |
| ] * 8 |
| |
| feats = profiler.extract_behavior_features(conservative_trades) |
| test("Feature extraction returns array", isinstance(feats, np.ndarray)) |
| test("15 behavior features", len(feats) == 15, f"got {len(feats)}") |
| test("Win rate correct", abs(feats[0] - 20/28) < 0.01, f"got {feats[0]:.3f}") |
| |
| profile = profiler.predict_type(feats) |
| test("Profile returns dict", isinstance(profile, dict)) |
| test("Profile has cluster", 'cluster' in profile) |
| test("Profile has type_name", 'type_name' in profile) |
| test("Conservative classified as Swing/Conservative", profile['type_name'] in ['Swing Trader', 'Conservative']) |
| print(f" Conservative Carol β {profile['type_name']}") |
| |
| |
| aggressive_trades = [ |
| {'entry_price': 100, 'exit_price': 105, 'size': 0.15, 'pnl': 750, 'holding_time': 60, 'direction': 1} |
| ] * 12 + [ |
| {'entry_price': 100, 'exit_price': 93, 'size': 0.20, 'pnl': -1400, 'holding_time': 30, 'direction': 1} |
| ] * 10 |
| |
| agg_feats = profiler.extract_behavior_features(aggressive_trades) |
| agg_profile = profiler.predict_type(agg_feats) |
| test("Aggressive classified correctly", agg_profile['type_name'] in ['Aggressive', 'Moderate']) |
| print(f" Aggressive Alex β {agg_profile['type_name']}") |
| |
| |
| scalper_trades = [ |
| {'entry_price': 100, 'exit_price': 100.1, 'size': 0.03, 'pnl': 3, 'holding_time': 2, 'direction': 1} |
| ] * 80 + [ |
| {'entry_price': 100, 'exit_price': 99.95, 'size': 0.03, 'pnl': -1.5, 'holding_time': 1, 'direction': -1} |
| ] * 50 |
| |
| scalp_feats = profiler.extract_behavior_features(scalper_trades) |
| scalp_profile = profiler.predict_type(scalp_feats) |
| test("Scalper classified correctly", scalp_profile['type_name'] == 'Scalper') |
| print(f" Scalper Sam β {scalp_profile['type_name']}") |
| |
| |
| empty_feats = profiler.extract_behavior_features([]) |
| test("Empty trades returns zeros", np.all(empty_feats == 0)) |
| |
| |
| alert_system = BehaviorAlertSystem() |
| |
| |
| normal_alerts = alert_system.analyze(conservative_trades[-5:], 100000, 2.0) |
| test("Normal status", normal_alerts['status'] in ['normal', 'warning']) |
| |
| |
| many_trades = [{'entry_price': 100, 'exit_price': 100.1, 'size': 0.01, 'pnl': 1, 'holding_time': 1, 'direction': 1}] * 15 |
| over_alerts = alert_system.analyze(many_trades, 100000, 1.0) |
| test("Overtrading detected", any(a['type'] == 'OVERTRADING' for a in over_alerts['alerts']), |
| f"alerts: {[a['type'] for a in over_alerts['alerts']]}") |
| |
| |
| loss_trades = [{'entry_price': 100, 'exit_price': 99, 'size': 0.05, 'pnl': -50, 'holding_time': 60, 'direction': 1}] * 5 |
| loss_alerts = alert_system.analyze(loss_trades, 100000, 1.0) |
| test("Loss streak detected", any(a['type'] == 'LOSS_STREAK' for a in loss_alerts['alerts'])) |
| |
| |
| big_loss = [{'entry_price': 100, 'exit_price': 80, 'size': 0.2, 'pnl': -20000, 'holding_time': 30, 'direction': 1}] * 3 |
| dd_alerts = alert_system.analyze(big_loss, 100000, 1.0) |
| test("Excessive drawdown detected", any(a['type'] == 'EXCESSIVE_DRAWDOWN' for a in dd_alerts['alerts'])) |
| test("Critical status on drawdown", dd_alerts['status'] == 'critical') |
| |
| |
| engine = PersonalizationEngine() |
| params = engine.get_personalized_params( |
| {'cluster': 0, 'type_name': 'Conservative'}, |
| {'alerts': [], 'risk_multiplier': 1.0, 'status': 'normal'} |
| ) |
| test("Personalization returns params", isinstance(params, dict)) |
| test("Conservative max_position <= 2%", params['max_position_pct'] <= 0.02) |
| test("Conservative min_confidence >= 70%", params['min_confidence'] >= 0.7) |
| |
| |
| revenge_params = engine.get_personalized_params( |
| {'cluster': 2, 'type_name': 'Aggressive'}, |
| {'alerts': [{'type': 'REVENGE_TRADING', 'severity': 'CRITICAL', 'message': 'test'}], |
| 'risk_multiplier': 0.3, 'status': 'critical'} |
| ) |
| test("Revenge trading increases min_confidence", revenge_params['min_confidence'] > 0.55) |
| test("Risk multiplier reduces position size", revenge_params['max_position_pct'] < 0.10) |
| |
| print(f"\n π€ Personalization: all trader types, alerts, and adaptations verified") |
|
|
| except Exception as e: |
| test("Personalization module", False, f"EXCEPTION: {e}") |
| traceback.print_exc() |
|
|
| |
| |
| |
| section("TEST 6: DECISION ENGINE") |
| try: |
| from trading_intelligence.decision_engine import ( |
| DecisionEngine, Signal, TradingDecision, format_decision |
| ) |
| |
| engine = DecisionEngine( |
| prediction_model=model, |
| personalization_engine=PersonalizationEngine(), |
| ) |
| |
| test_features = np.random.randn(1, num_channels, 30).astype(np.float32) |
| |
| |
| decision = engine.make_decision( |
| market_features=test_features, |
| trader_profile={'cluster': 1, 'type_name': 'Moderate'}, |
| behavior_alerts={'alerts': [], 'risk_multiplier': 1.0, 'status': 'normal'}, |
| current_atr=0.015, |
| horizon_idx=0, |
| ) |
| |
| test("Decision is TradingDecision", isinstance(decision, TradingDecision)) |
| test("Signal is valid enum", isinstance(decision.signal, Signal)) |
| test("Confidence in [0,1]", 0 <= decision.confidence <= 1, f"{decision.confidence:.3f}") |
| test("Direction prob in [0,1]", 0 <= decision.direction_prob <= 1) |
| test("Risk score in [0,1]", 0 <= decision.risk_score <= 1) |
| test("Position size > 0", decision.position_size_pct > 0) |
| test("Stop loss > 0", decision.stop_loss_pct > 0) |
| test("Take profit > 0", decision.take_profit_pct > 0) |
| test("Has reasoning", len(decision.reasoning) > 0) |
| test("Has horizon label", decision.horizon in ['short_term', 'mid_term', 'long_term']) |
| |
| print(f" Decision: {decision.signal.value} (conf={decision.confidence:.1%})") |
| |
| |
| decisions = engine.make_multi_horizon_decisions( |
| market_features=test_features, |
| trader_profile={'cluster': 1, 'type_name': 'Moderate'}, |
| behavior_alerts={'alerts': [], 'risk_multiplier': 1.0, 'status': 'normal'}, |
| current_atr=0.015, |
| ) |
| test("3 horizon decisions", len(decisions) == 3) |
| test("Horizons are different", |
| len(set(d.horizon for d in decisions)) == 3, |
| f"{[d.horizon for d in decisions]}") |
| |
| for d in decisions: |
| print(f" {d.horizon}: {d.signal.value} (conf={d.confidence:.1%}, dir={d.direction_prob:.1%})") |
| |
| |
| critical_decision = engine.make_decision( |
| market_features=test_features, |
| trader_profile={'cluster': 2, 'type_name': 'Aggressive'}, |
| behavior_alerts={ |
| 'alerts': [{'type': 'REVENGE_TRADING', 'severity': 'CRITICAL', 'message': 'test'}], |
| 'risk_multiplier': 0.3, 'status': 'critical' |
| }, |
| current_atr=0.015, |
| horizon_idx=0, |
| ) |
| test("Critical alert forces HOLD", critical_decision.signal == Signal.HOLD) |
| test("Alert in reasoning", any('CRITICAL' in r for r in critical_decision.reasoning)) |
| |
| |
| formatted = format_decision(decision) |
| test("format_decision returns string", isinstance(formatted, str)) |
| test("format_decision has signal", decision.signal.value in formatted) |
| test("format_decision has confidence", 'Confidence' in formatted) |
| |
| |
| engine_no_model = DecisionEngine() |
| default_decision = engine_no_model.make_decision( |
| market_features=test_features, |
| current_atr=0.015, |
| ) |
| test("Works without model (defaults)", isinstance(default_decision, TradingDecision)) |
| |
| print(f"\n π― Decision engine: all signal types, alerts, formatting verified") |
|
|
| except Exception as e: |
| test("Decision engine module", False, f"EXCEPTION: {e}") |
| traceback.print_exc() |
|
|
| |
| |
| |
| section("TEST 7: EVALUATION & BACKTESTING") |
| try: |
| from trading_intelligence.evaluation import Evaluator, format_evaluation |
| |
| evaluator = Evaluator(prediction_horizons=[1, 5, 20], trading_costs=0.001) |
| test_loader = DataLoader( |
| TensorDataset(torch.FloatTensor(X_test), torch.FloatTensor(y_test)), |
| batch_size=128, shuffle=False |
| ) |
| |
| eval_results = evaluator.evaluate_predictions(model, test_loader, device) |
| |
| test("Evaluation returns dict", isinstance(eval_results, dict)) |
| test("Has summary", 'summary' in eval_results) |
| test("Has horizon_1", 'horizon_1' in eval_results) |
| test("Has horizon_5", 'horizon_5' in eval_results) |
| test("Has horizon_20", 'horizon_20' in eval_results) |
| |
| summary = eval_results['summary'] |
| test("num_test_samples > 0", summary['num_test_samples'] > 0) |
| test("avg_direction_accuracy in [0,1]", 0 <= summary['avg_direction_accuracy'] <= 1) |
| test("avg_ic is finite", np.isfinite(summary['avg_ic'])) |
| |
| for h in [1, 5, 20]: |
| hr = eval_results[f'horizon_{h}'] |
| test(f"H{h} direction_accuracy in [0,1]", 0 <= hr['direction_accuracy'] <= 1) |
| test(f"H{h} sharpe_ratio is finite", np.isfinite(hr['sharpe_ratio'])) |
| test(f"H{h} max_drawdown in [0,1]", 0 <= hr['max_drawdown'] <= 1) |
| test(f"H{h} profit_factor >= 0", hr['profit_factor'] >= 0) |
| test(f"H{h} win_rate in [0,1]", 0 <= hr['win_rate'] <= 1) |
| test(f"H{h} num_trades > 0", hr['num_trades'] > 0) |
| print(f" H{h}: DA={hr['direction_accuracy']:.1%} IC={hr['information_coefficient']:.4f} " |
| f"Sharpe={hr['sharpe_ratio']:.2f} DD={hr['max_drawdown']:.1%} PF={hr['profit_factor']:.2f}") |
| |
| |
| formatted = format_evaluation(eval_results) |
| test("format_evaluation returns string", isinstance(formatted, str)) |
| test("format_evaluation has content", len(formatted) > 100) |
| |
| print(f"\n π Evaluation: all metrics computed and validated") |
|
|
| except Exception as e: |
| test("Evaluation module", False, f"EXCEPTION: {e}") |
| traceback.print_exc() |
|
|
| |
| |
| |
| section("TEST 8: TRAINING PIPELINE (high-level)") |
| try: |
| from trading_intelligence.training import TrainingPipeline, FinancialTimeSeriesDataset |
| |
| pipeline = TrainingPipeline( |
| lookback_window=30, |
| prediction_horizons=[1, 5, 20], |
| d_model=64, n_heads=4, n_layers=2, d_ff=128, |
| patch_len=6, stride=3, dropout=0.1, |
| learning_rate=1e-3, batch_size=128, |
| max_epochs=3, patience=2, |
| ) |
| |
| train_loader, val_loader, test_loader = pipeline.prepare_data(df) |
| test("Pipeline prepare_data", True) |
| test("Pipeline model initialized", pipeline.model is not None) |
| test("Pipeline loss_fn initialized", pipeline.loss_fn is not None) |
| |
| results = pipeline.train(train_loader, val_loader) |
| test("Pipeline train completes", 'best_val_loss' in results) |
| test("Pipeline training history", len(results['history']) > 0) |
| |
| |
| pipeline.save_model('/app/models/pipeline_model.pt') |
| test("Pipeline model saved", os.path.exists('/app/models/pipeline_model.pt')) |
| |
| pipeline2 = TrainingPipeline(lookback_window=30, prediction_horizons=[1,5,20]) |
| pipeline2.load_model('/app/models/pipeline_model.pt') |
| test("Pipeline model loaded", pipeline2.model is not None) |
| |
| print(f"\n π§ Training pipeline: prepare, train, save, load all verified") |
|
|
| except Exception as e: |
| test("Training pipeline", False, f"EXCEPTION: {e}") |
| traceback.print_exc() |
|
|
| |
| |
| |
| section("INTEGRATION TEST: FULL PIPELINE") |
| try: |
| print(" Running complete end-to-end flow...") |
| |
| |
| fe = FeatureEngine(lookback_window=30, prediction_horizons=[1, 5, 20]) |
| features = fe.compute_all_features(df) |
| features_norm, _ = fe.normalize_features(features) |
| |
| |
| target_cols = [] |
| for h in [1, 5, 20]: |
| target_cols.extend([f'target_direction_{h}', f'target_return_{h}']) |
| X_all, y_all = fe.create_sequences(features_norm, target_cols=target_cols) |
| valid = np.isfinite(X_all).all(axis=(1, 2)) & np.isfinite(y_all).all(axis=1) |
| X_all, y_all = X_all[valid], y_all[valid] |
| |
| |
| model_final = TradingTransformer( |
| num_channels=X_all.shape[1], seq_len=30, patch_len=6, stride=3, |
| d_model=64, n_heads=4, n_layers=2, d_ff=128, num_horizons=3 |
| ) |
| |
| |
| model_final.eval() |
| with torch.no_grad(): |
| sample = torch.FloatTensor(X_all[-1:]) |
| pred = model_final.predict_with_confidence(sample) |
| |
| |
| rm = RiskModel(market_dim=64, portfolio_dim=64, behavior_dim=64) |
| rm.eval() |
| |
| |
| profiler = TraderProfiler() |
| alert_system = BehaviorAlertSystem() |
| pers = PersonalizationEngine() |
| |
| sample_trades = [ |
| {'entry_price': 100, 'exit_price': 101.5, 'size': 0.05, 'pnl': 75, 'holding_time': 120, 'direction': 1} |
| ] * 15 + [ |
| {'entry_price': 100, 'exit_price': 99, 'size': 0.05, 'pnl': -50, 'holding_time': 60, 'direction': -1} |
| ] * 8 |
| |
| trader_feats = profiler.extract_behavior_features(sample_trades) |
| trader_profile = profiler.predict_type(trader_feats) |
| alerts = alert_system.analyze(sample_trades[-5:], 100000, 1.0) |
| |
| |
| decision_engine = DecisionEngine( |
| prediction_model=model_final, |
| personalization_engine=pers, |
| ) |
| |
| final_decision = decision_engine.make_decision( |
| market_features=X_all[-1:], |
| trader_profile=trader_profile, |
| behavior_alerts=alerts, |
| current_atr=0.015, |
| horizon_idx=1, |
| ) |
| |
| test("Integration: features computed", len(features) > 0) |
| test("Integration: sequences created", X_all.shape[0] > 0) |
| test("Integration: prediction made", pred['direction_probs'].shape == (1, 3)) |
| test("Integration: trader profiled", trader_profile['type_name'] in TRADER_TYPES.values()) |
| test("Integration: decision generated", isinstance(final_decision.signal, Signal)) |
| |
| print(f"\n Full pipeline output:") |
| print(format_decision(final_decision)) |
| |
| test("Integration: complete pipeline works", True) |
|
|
| except Exception as e: |
| test("Integration test", False, f"EXCEPTION: {e}") |
| traceback.print_exc() |
|
|
| |
| |
| |
| section("TEST SUMMARY") |
| total = PASS + FAIL |
| print(f"\n β
Passed: {PASS}/{total}") |
| print(f" β Failed: {FAIL}/{total}") |
| print(f" Pass Rate: {PASS/total*100:.1f}%") |
|
|
| if FAIL == 0: |
| print(f"\n π ALL TESTS PASSED!") |
| else: |
| print(f"\n β οΈ {FAIL} test(s) need attention") |
|
|
| print(f"\n{'='*70}") |
|
|