avinashhm's picture
Fix: test_all.py - all 174 tests passing
0e2300a verified
#!/usr/bin/env python3
"""
Comprehensive Test Suite for Trading Intelligence System
=========================================================
Tests each module independently, then runs full integration.
Every assertion is checked, every output is validated.
"""
import sys, os, time, traceback, warnings
warnings.filterwarnings('ignore')
sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', buffering=1)
sys.path.insert(0, '/app')
import numpy as np
import pandas as pd
import torch
PASS = 0
FAIL = 0
def test(name, condition, detail=""):
global PASS, FAIL
if condition:
PASS += 1
print(f" βœ… {name}")
else:
FAIL += 1
print(f" ❌ {name} β€” {detail}")
def section(title):
print(f"\n{'='*70}")
print(f" {title}")
print(f"{'='*70}")
# ═══════════════════════════════════════════════════════
# GENERATE SYNTHETIC DATA (shared across all tests)
# ═══════════════════════════════════════════════════════
section("DATA GENERATION")
np.random.seed(42)
num_days = 1500
dt = 1/252
prices = [150.0]
vol = 0.20
for i in range(num_days - 1):
vol = vol + 0.1 * (0.20 - vol) * dt + 0.3 * np.sqrt(dt) * np.random.normal()
vol = max(vol, 0.05)
ret = (0.08 - 0.5 * vol**2) * dt + vol * np.sqrt(dt) * np.random.normal()
prices.append(prices[-1] * np.exp(ret))
df = pd.DataFrame({
'date': pd.date_range('2019-01-02', periods=num_days, freq='B')[:num_days],
'open': [p * (1 + np.random.normal(0, 0.002)) for p in prices],
'high': [p * (1 + abs(np.random.normal(0, 0.01))) for p in prices],
'low': [p * (1 - abs(np.random.normal(0, 0.01))) for p in prices],
'close': prices,
'volume': [int(1e6 * np.exp(np.random.normal(0, 0.3))) for _ in range(num_days)],
})
df['high'] = df[['open', 'high', 'close']].max(axis=1) * (1 + abs(np.random.normal(0, 0.002, num_days)))
df['low'] = df[['open', 'low', 'close']].min(axis=1) * (1 - abs(np.random.normal(0, 0.002, num_days)))
test("Data generated", len(df) == num_days, f"got {len(df)}")
test("OHLCV columns present", all(c in df.columns for c in ['open','high','low','close','volume']))
test("No NaN in raw data", df[['open','high','low','close','volume']].isna().sum().sum() == 0)
test("High >= Low", (df['high'] >= df['low']).all())
test("High >= Close", (df['high'] >= df['close']).all())
test("Low <= Close", (df['low'] <= df['close']).all())
print(f" Price range: ${min(prices):.2f} β€” ${max(prices):.2f}")
# ═══════════════════════════════════════════════════════
# TEST 1: FEATURE ENGINE
# ═══════════════════════════════════════════════════════
section("TEST 1: FEATURE ENGINE")
try:
from trading_intelligence.feature_engine import FeatureEngine, SentimentFeatureEngine
fe = FeatureEngine(lookback_window=30, prediction_horizons=[1, 5, 20])
features = fe.compute_all_features(df)
test("Feature engine initializes", True)
test("Features computed", len(features) > 0, f"got {len(features)} rows")
test("No NaN after dropna", features[fe.feature_names].isna().sum().sum() == 0)
test("Feature count >= 60", len(fe.feature_names) >= 60, f"got {len(fe.feature_names)}")
# Check specific feature groups
price_feats = [f for f in fe.feature_names if 'return' in f or 'momentum' in f or 'body' in f]
tech_feats = [f for f in fe.feature_names if 'rsi' in f or 'macd' in f or 'ema' in f or 'bb_' in f]
vol_feats = [f for f in fe.feature_names if 'vol' in f.lower() and 'obv' not in f]
regime_feats = [f for f in fe.feature_names if 'regime' in f or 'trend' in f or 'hurst' in f]
test("Price features exist", len(price_feats) > 5, f"got {len(price_feats)}")
test("Technical indicators exist", len(tech_feats) > 10, f"got {len(tech_feats)}")
test("Volatility features exist", len(vol_feats) > 5, f"got {len(vol_feats)}")
test("Regime features exist", len(regime_feats) > 2, f"got {len(regime_feats)}")
# Check targets
for h in [1, 5, 20]:
test(f"Target direction_{h} exists", f'target_direction_{h}' in features.columns)
test(f"Target return_{h} exists", f'target_return_{h}' in features.columns)
vals = features[f'target_direction_{h}'].dropna().unique()
test(f"Direction_{h} is binary", set(vals).issubset({0.0, 1.0}), f"got {vals[:5]}")
# Normalization
features_norm, norm_params = fe.normalize_features(features)
test("Normalization produces params", len(norm_params) > 0)
test("Normalized features have similar scale",
abs(features_norm[fe.feature_names].mean().mean()) < 1.0,
f"mean={features_norm[fe.feature_names].mean().mean():.4f}")
# Sequence creation
target_cols = []
for h in [1, 5, 20]:
target_cols.extend([f'target_direction_{h}', f'target_return_{h}'])
X, y = fe.create_sequences(features_norm, target_cols=target_cols)
valid = np.isfinite(X).all(axis=(1, 2)) & np.isfinite(y).all(axis=1)
X, y = X[valid], y[valid]
test("Sequences created", X.shape[0] > 0)
test("X shape correct (N, C, L)", len(X.shape) == 3)
test("X channels match features", X.shape[1] == len(fe.feature_names),
f"{X.shape[1]} vs {len(fe.feature_names)}")
test("X lookback correct", X.shape[2] == 30, f"got {X.shape[2]}")
test("Y targets = 6 (3 horizons Γ— 2)", y.shape[1] == 6, f"got {y.shape[1]}")
test("No NaN/Inf in X", np.isfinite(X).all())
test("No NaN/Inf in y", np.isfinite(y).all())
# Sentiment engine
se = SentimentFeatureEngine()
score = se.compute_rule_based_sentiment("Stock upgraded, strong growth expected, bullish outlook")
test("Sentiment positive for bullish text", score > 0, f"score={score:.3f}")
score_neg = se.compute_rule_based_sentiment("Stock crashed, massive loss, bearish outlook")
test("Sentiment negative for bearish text", score_neg < 0, f"score={score_neg:.3f}")
print(f"\n πŸ“Š Feature Summary: {len(fe.feature_names)} features, {X.shape[0]} samples")
print(f" Feature names: {fe.feature_names[:10]}...")
except Exception as e:
test("Feature engine module", False, f"EXCEPTION: {e}")
traceback.print_exc()
# ═══════════════════════════════════════════════════════
# TEST 2: PREDICTION MODEL
# ═══════════════════════════════════════════════════════
section("TEST 2: PREDICTION MODEL")
try:
from trading_intelligence.prediction_model import (
PatchEmbedding, PositionalEncoding, MultiHeadAttention,
TransformerBlock, ChannelMixer, PredictionHead,
TradingTransformer, MultiTaskLoss
)
device = torch.device('cpu')
num_channels = X.shape[1]
batch_size = 16
# Test PatchEmbedding
pe = PatchEmbedding(patch_len=6, stride=3, d_model=64)
test_input = torch.randn(batch_size, num_channels, 30)
patches = pe(test_input)
test("PatchEmbedding forward", patches.shape[0] == batch_size)
test("PatchEmbedding output 4D", len(patches.shape) == 4)
test("PatchEmbedding d_model=64", patches.shape[-1] == 64)
# Test PositionalEncoding
pos = PositionalEncoding(d_model=64)
pos_out = pos(patches)
test("PositionalEncoding same shape", pos_out.shape == patches.shape)
# Test MultiHeadAttention
mha = MultiHeadAttention(d_model=64, n_heads=4)
attn_input = torch.randn(batch_size, 10, 64)
attn_out = mha(attn_input)
test("MHA forward", attn_out.shape == attn_input.shape)
# Test TransformerBlock
tb = TransformerBlock(d_model=64, n_heads=4, d_ff=128)
tb_out = tb(attn_input)
test("TransformerBlock forward", tb_out.shape == attn_input.shape)
# Test ChannelMixer
cm = ChannelMixer(num_channels=num_channels, d_model=64, n_heads=4)
cm_out = cm(patches)
test("ChannelMixer forward", cm_out.shape == patches.shape)
# Test PredictionHead
ph = PredictionHead(d_model=64, num_horizons=3)
ph_input = torch.randn(batch_size, 64)
ph_out = ph(ph_input)
test("PredictionHead returns dict", isinstance(ph_out, dict))
test("PredictionHead has direction_logits", 'direction_logits' in ph_out)
test("PredictionHead has expected_return", 'expected_return' in ph_out)
test("PredictionHead has log_variance", 'log_variance' in ph_out)
test("Direction shape (B, 3)", ph_out['direction_logits'].shape == (batch_size, 3))
# Test Full TradingTransformer
model = TradingTransformer(
num_channels=num_channels, seq_len=30, patch_len=6, stride=3,
d_model=64, n_heads=4, n_layers=2, d_ff=128,
num_horizons=3, dropout=0.1,
).to(device)
param_count = sum(p.numel() for p in model.parameters())
test("Model instantiates", True)
test("Model has parameters", param_count > 0, f"{param_count:,} params")
x_batch = torch.FloatTensor(X[:batch_size]).to(device)
output = model(x_batch)
test("Model forward pass", isinstance(output, dict))
test("Output direction_logits shape", output['direction_logits'].shape == (batch_size, 3))
test("Output expected_return shape", output['expected_return'].shape == (batch_size, 3))
test("Output log_variance shape", output['log_variance'].shape == (batch_size, 3))
test("No NaN in output", all(torch.isfinite(v).all().item() for v in output.values()))
# Test predict_with_confidence
preds = model.predict_with_confidence(x_batch)
test("predict_with_confidence returns dict", isinstance(preds, dict))
test("direction_probs in [0,1]", (preds['direction_probs'] >= 0).all() and (preds['direction_probs'] <= 1).all())
test("confidence in [0,1]", (preds['confidence'] >= 0).all() and (preds['confidence'] <= 1).all())
# Test MultiTaskLoss
loss_fn = MultiTaskLoss(num_horizons=3).to(device)
y_batch = torch.FloatTensor(y[:batch_size]).to(device)
directions = torch.stack([y_batch[:, i*2] for i in range(3)], dim=1)
returns = torch.stack([y_batch[:, i*2+1] for i in range(3)], dim=1)
targets = {'direction': directions, 'returns': returns}
losses = loss_fn(output, targets)
test("Loss computes", isinstance(losses, dict))
test("total_loss is scalar", losses['total_loss'].dim() == 0)
test("total_loss is finite", torch.isfinite(losses['total_loss']).item())
test("direction_loss exists", 'direction_loss' in losses)
test("return_loss exists", 'return_loss' in losses)
test("risk_loss exists", 'risk_loss' in losses)
# Test backward pass
losses['total_loss'].backward()
grads_ok = all(p.grad is not None and torch.isfinite(p.grad).all() for p in model.parameters() if p.requires_grad)
test("Backward pass - gradients computed", grads_ok)
print(f"\n 🧠 Model: {param_count:,} params, output verified across all heads")
except Exception as e:
test("Prediction model module", False, f"EXCEPTION: {e}")
traceback.print_exc()
# ═══════════════════════════════════════════════════════
# TEST 3: TRAINING LOOP (abbreviated)
# ═══════════════════════════════════════════════════════
section("TEST 3: TRAINING LOOP")
try:
from torch.utils.data import TensorDataset, DataLoader
# Split data
n = len(X)
train_end = int(n * 0.7)
val_end = int(n * 0.85)
X_train, y_train = X[:train_end], y[:train_end]
X_val, y_val = X[train_end:val_end], y[train_end:val_end]
X_test, y_test = X[val_end:], y[val_end:]
test("Train set size > 0", len(X_train) > 0, f"{len(X_train)}")
test("Val set size > 0", len(X_val) > 0, f"{len(X_val)}")
test("Test set size > 0", len(X_test) > 0, f"{len(X_test)}")
# Re-init model fresh
model = TradingTransformer(
num_channels=num_channels, seq_len=30, patch_len=6, stride=3,
d_model=64, n_heads=4, n_layers=2, d_ff=128,
num_horizons=3, dropout=0.1,
).to(device)
loss_fn = MultiTaskLoss(num_horizons=3).to(device)
optimizer = torch.optim.AdamW(
list(model.parameters()) + list(loss_fn.parameters()),
lr=1e-3, weight_decay=1e-4
)
train_loader = DataLoader(
TensorDataset(torch.FloatTensor(X_train), torch.FloatTensor(y_train)),
batch_size=128, shuffle=True
)
val_loader = DataLoader(
TensorDataset(torch.FloatTensor(X_val), torch.FloatTensor(y_val)),
batch_size=128, shuffle=False
)
# Train 5 epochs and check loss decreases
train_losses = []
val_losses = []
val_accs = []
for epoch in range(5):
# Train
model.train()
epoch_loss = 0
n_batch = 0
for xb, yb in train_loader:
xb, yb = xb.to(device), yb.to(device)
preds = model(xb)
dirs = torch.stack([yb[:, i*2] for i in range(3)], dim=1)
rets = torch.stack([yb[:, i*2+1] for i in range(3)], dim=1)
loss_dict = loss_fn(preds, {'direction': dirs, 'returns': rets})
optimizer.zero_grad()
loss_dict['total_loss'].backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
epoch_loss += loss_dict['total_loss'].item()
n_batch += 1
train_losses.append(epoch_loss / n_batch)
# Validate
model.eval()
v_loss = 0
v_batches = 0
correct = 0
total = 0
with torch.no_grad():
for xb, yb in val_loader:
xb, yb = xb.to(device), yb.to(device)
preds = model(xb)
dirs = torch.stack([yb[:, i*2] for i in range(3)], dim=1)
rets = torch.stack([yb[:, i*2+1] for i in range(3)], dim=1)
loss_dict = loss_fn(preds, {'direction': dirs, 'returns': rets})
v_loss += loss_dict['total_loss'].item()
v_batches += 1
dir_preds = (torch.sigmoid(preds['direction_logits']) > 0.5).float()
correct += (dir_preds[:, 0] == dirs[:, 0]).sum().item()
total += len(xb)
val_losses.append(v_loss / v_batches)
val_accs.append(correct / total)
print(f" Epoch {epoch+1}: train_loss={train_losses[-1]:.4f} val_loss={val_losses[-1]:.4f} DA-1d={val_accs[-1]:.1%}")
test("Training runs without error", True)
test("Train loss decreases", train_losses[-1] < train_losses[0],
f"{train_losses[0]:.4f} β†’ {train_losses[-1]:.4f}")
test("Val loss decreases", val_losses[-1] < val_losses[0],
f"{val_losses[0]:.4f} β†’ {val_losses[-1]:.4f}")
test("Direction accuracy > random (40%)", val_accs[-1] > 0.40, f"{val_accs[-1]:.1%}")
test("No NaN in losses", all(np.isfinite(l) for l in train_losses + val_losses))
# Save and reload
os.makedirs('/app/models', exist_ok=True)
save_path = '/app/models/test_model.pt'
torch.save({
'model_state': model.state_dict(),
'config': {'num_channels': num_channels, 'd_model': 64, 'n_heads': 4,
'n_layers': 2, 'd_ff': 128, 'patch_len': 6, 'stride': 3}
}, save_path)
test("Model saves", os.path.exists(save_path))
checkpoint = torch.load(save_path, map_location='cpu', weights_only=False)
model2 = TradingTransformer(
num_channels=num_channels, seq_len=30, patch_len=6, stride=3,
d_model=64, n_heads=4, n_layers=2, d_ff=128, num_horizons=3
)
model2.load_state_dict(checkpoint['model_state'])
model2.eval()
with torch.no_grad():
out1 = model(torch.FloatTensor(X[:4]))
out2 = model2(torch.FloatTensor(X[:4]))
test("Saved/loaded model produces same output",
torch.allclose(out1['direction_logits'], out2['direction_logits'], atol=1e-5))
print(f"\n πŸ“ˆ Training verified: loss {train_losses[0]:.4f} β†’ {train_losses[-1]:.4f}")
except Exception as e:
test("Training loop", False, f"EXCEPTION: {e}")
traceback.print_exc()
# ═══════════════════════════════════════════════════════
# TEST 4: RISK MODEL
# ═══════════════════════════════════════════════════════
section("TEST 4: RISK MODEL")
try:
from trading_intelligence.risk_model import (
PortfolioEncoder, TraderBehaviorAnalyzer, RiskModel, RiskLoss
)
B = 4
# Test PortfolioEncoder
pe = PortfolioEncoder(position_dim=8, max_positions=5, d_model=64)
positions = torch.randn(B, 5, 8)
account = torch.randn(B, 6)
mask = torch.ones(B, 5, dtype=torch.bool)
mask[:, 3:] = False
port_repr = pe(positions, account, mask)
test("PortfolioEncoder forward", port_repr.shape == (B, 64))
test("PortfolioEncoder no NaN", torch.isfinite(port_repr).all())
# Test TraderBehaviorAnalyzer
tba = TraderBehaviorAnalyzer(trade_dim=12, d_model=64, n_layers=2)
trade_hist = torch.randn(B, 20, 12)
behavior = tba(trade_hist)
test("BehaviorAnalyzer returns dict", isinstance(behavior, dict))
test("risk_appetite shape", behavior['risk_appetite'].shape == (B,))
test("risk_appetite in [0,1]", (behavior['risk_appetite'] >= 0).all() and (behavior['risk_appetite'] <= 1).all())
test("overtrading_prob in [0,1]", (behavior['overtrading_prob'] >= 0).all() and (behavior['overtrading_prob'] <= 1).all())
test("revenge_trading_prob in [0,1]", (behavior['revenge_trading_prob'] >= 0).all() and (behavior['revenge_trading_prob'] <= 1).all())
test("trader_type_logits shape", behavior['trader_type_logits'].shape == (B, 5))
test("behavior_embedding shape", behavior['behavior_embedding'].shape == (B, 64))
# Test Full RiskModel
rm = RiskModel(market_dim=64, portfolio_dim=64, behavior_dim=64)
rm.eval()
market_state = torch.randn(B, 64)
with torch.no_grad():
risk_out = rm(market_state, positions, account, trade_hist, mask)
test("RiskModel returns dict", isinstance(risk_out, dict))
test("risk_score shape", risk_out['risk_score'].shape == (B,))
test("risk_score in [0,1]", (risk_out['risk_score'] >= 0).all() and (risk_out['risk_score'] <= 1).all())
test("adjusted_position_size in [0,1]", (risk_out['adjusted_position_size'] >= 0).all() and (risk_out['adjusted_position_size'] <= 1).all())
test("stop_loss_atr_mult >= 0", (risk_out['stop_loss_atr_mult'] >= 0).all())
test("take_profit_atr_mult >= 0", (risk_out['take_profit_atr_mult'] >= 0).all())
test("drawdown_probs shape", risk_out['drawdown_probs'].shape == (B, 4))
test("drawdown_probs in [0,1]", (risk_out['drawdown_probs'] >= 0).all() and (risk_out['drawdown_probs'] <= 1).all())
test("var_estimates shape", risk_out['var_estimates'].shape == (B, 3))
test("behavior_profile in output", 'behavior_profile' in risk_out)
# Test RiskLoss
rl = RiskLoss()
targets = {
'actual_risk': torch.rand(B),
'optimal_position_size': torch.rand(B),
'drawdown_occurred': torch.rand(B, 4),
}
risk_losses = rl(risk_out, targets)
test("RiskLoss computes", 'total_loss' in risk_losses)
test("RiskLoss finite", torch.isfinite(risk_losses['total_loss']))
print(f"\n πŸ›‘οΈ Risk model: all outputs verified, shapes correct")
except Exception as e:
test("Risk model module", False, f"EXCEPTION: {e}")
traceback.print_exc()
# ═══════════════════════════════════════════════════════
# TEST 5: PERSONALIZATION
# ═══════════════════════════════════════════════════════
section("TEST 5: PERSONALIZATION")
try:
from trading_intelligence.personalization import (
TraderProfiler, BehaviorAlertSystem, PersonalizationEngine, TRADER_TYPES
)
test("5 trader types defined", len(TRADER_TYPES) == 5)
profiler = TraderProfiler()
# Test with conservative trader
conservative_trades = [
{'entry_price': 100, 'exit_price': 101, 'size': 0.01, 'pnl': 10, 'holding_time': 2880, 'direction': 1}
] * 20 + [
{'entry_price': 100, 'exit_price': 99.5, 'size': 0.01, 'pnl': -5, 'holding_time': 1440, 'direction': 1}
] * 8
feats = profiler.extract_behavior_features(conservative_trades)
test("Feature extraction returns array", isinstance(feats, np.ndarray))
test("15 behavior features", len(feats) == 15, f"got {len(feats)}")
test("Win rate correct", abs(feats[0] - 20/28) < 0.01, f"got {feats[0]:.3f}")
profile = profiler.predict_type(feats)
test("Profile returns dict", isinstance(profile, dict))
test("Profile has cluster", 'cluster' in profile)
test("Profile has type_name", 'type_name' in profile)
test("Conservative classified as Swing/Conservative", profile['type_name'] in ['Swing Trader', 'Conservative'])
print(f" Conservative Carol β†’ {profile['type_name']}")
# Test aggressive trader
aggressive_trades = [
{'entry_price': 100, 'exit_price': 105, 'size': 0.15, 'pnl': 750, 'holding_time': 60, 'direction': 1}
] * 12 + [
{'entry_price': 100, 'exit_price': 93, 'size': 0.20, 'pnl': -1400, 'holding_time': 30, 'direction': 1}
] * 10
agg_feats = profiler.extract_behavior_features(aggressive_trades)
agg_profile = profiler.predict_type(agg_feats)
test("Aggressive classified correctly", agg_profile['type_name'] in ['Aggressive', 'Moderate'])
print(f" Aggressive Alex β†’ {agg_profile['type_name']}")
# Test scalper
scalper_trades = [
{'entry_price': 100, 'exit_price': 100.1, 'size': 0.03, 'pnl': 3, 'holding_time': 2, 'direction': 1}
] * 80 + [
{'entry_price': 100, 'exit_price': 99.95, 'size': 0.03, 'pnl': -1.5, 'holding_time': 1, 'direction': -1}
] * 50
scalp_feats = profiler.extract_behavior_features(scalper_trades)
scalp_profile = profiler.predict_type(scalp_feats)
test("Scalper classified correctly", scalp_profile['type_name'] == 'Scalper')
print(f" Scalper Sam β†’ {scalp_profile['type_name']}")
# Test empty trades
empty_feats = profiler.extract_behavior_features([])
test("Empty trades returns zeros", np.all(empty_feats == 0))
# Test BehaviorAlertSystem
alert_system = BehaviorAlertSystem()
# Normal situation
normal_alerts = alert_system.analyze(conservative_trades[-5:], 100000, 2.0)
test("Normal status", normal_alerts['status'] in ['normal', 'warning'])
# Overtrading scenario (10+ trades in 1 hour)
many_trades = [{'entry_price': 100, 'exit_price': 100.1, 'size': 0.01, 'pnl': 1, 'holding_time': 1, 'direction': 1}] * 15
over_alerts = alert_system.analyze(many_trades, 100000, 1.0)
test("Overtrading detected", any(a['type'] == 'OVERTRADING' for a in over_alerts['alerts']),
f"alerts: {[a['type'] for a in over_alerts['alerts']]}")
# Loss streak scenario
loss_trades = [{'entry_price': 100, 'exit_price': 99, 'size': 0.05, 'pnl': -50, 'holding_time': 60, 'direction': 1}] * 5
loss_alerts = alert_system.analyze(loss_trades, 100000, 1.0)
test("Loss streak detected", any(a['type'] == 'LOSS_STREAK' for a in loss_alerts['alerts']))
# Excessive drawdown
big_loss = [{'entry_price': 100, 'exit_price': 80, 'size': 0.2, 'pnl': -20000, 'holding_time': 30, 'direction': 1}] * 3
dd_alerts = alert_system.analyze(big_loss, 100000, 1.0)
test("Excessive drawdown detected", any(a['type'] == 'EXCESSIVE_DRAWDOWN' for a in dd_alerts['alerts']))
test("Critical status on drawdown", dd_alerts['status'] == 'critical')
# Test PersonalizationEngine
engine = PersonalizationEngine()
params = engine.get_personalized_params(
{'cluster': 0, 'type_name': 'Conservative'},
{'alerts': [], 'risk_multiplier': 1.0, 'status': 'normal'}
)
test("Personalization returns params", isinstance(params, dict))
test("Conservative max_position <= 2%", params['max_position_pct'] <= 0.02)
test("Conservative min_confidence >= 70%", params['min_confidence'] >= 0.7)
# With revenge trading alert
revenge_params = engine.get_personalized_params(
{'cluster': 2, 'type_name': 'Aggressive'},
{'alerts': [{'type': 'REVENGE_TRADING', 'severity': 'CRITICAL', 'message': 'test'}],
'risk_multiplier': 0.3, 'status': 'critical'}
)
test("Revenge trading increases min_confidence", revenge_params['min_confidence'] > 0.55)
test("Risk multiplier reduces position size", revenge_params['max_position_pct'] < 0.10)
print(f"\n πŸ‘€ Personalization: all trader types, alerts, and adaptations verified")
except Exception as e:
test("Personalization module", False, f"EXCEPTION: {e}")
traceback.print_exc()
# ═══════════════════════════════════════════════════════
# TEST 6: DECISION ENGINE
# ═══════════════════════════════════════════════════════
section("TEST 6: DECISION ENGINE")
try:
from trading_intelligence.decision_engine import (
DecisionEngine, Signal, TradingDecision, format_decision
)
engine = DecisionEngine(
prediction_model=model,
personalization_engine=PersonalizationEngine(),
)
test_features = np.random.randn(1, num_channels, 30).astype(np.float32)
# Basic decision
decision = engine.make_decision(
market_features=test_features,
trader_profile={'cluster': 1, 'type_name': 'Moderate'},
behavior_alerts={'alerts': [], 'risk_multiplier': 1.0, 'status': 'normal'},
current_atr=0.015,
horizon_idx=0,
)
test("Decision is TradingDecision", isinstance(decision, TradingDecision))
test("Signal is valid enum", isinstance(decision.signal, Signal))
test("Confidence in [0,1]", 0 <= decision.confidence <= 1, f"{decision.confidence:.3f}")
test("Direction prob in [0,1]", 0 <= decision.direction_prob <= 1)
test("Risk score in [0,1]", 0 <= decision.risk_score <= 1)
test("Position size > 0", decision.position_size_pct > 0)
test("Stop loss > 0", decision.stop_loss_pct > 0)
test("Take profit > 0", decision.take_profit_pct > 0)
test("Has reasoning", len(decision.reasoning) > 0)
test("Has horizon label", decision.horizon in ['short_term', 'mid_term', 'long_term'])
print(f" Decision: {decision.signal.value} (conf={decision.confidence:.1%})")
# Multi-horizon decisions
decisions = engine.make_multi_horizon_decisions(
market_features=test_features,
trader_profile={'cluster': 1, 'type_name': 'Moderate'},
behavior_alerts={'alerts': [], 'risk_multiplier': 1.0, 'status': 'normal'},
current_atr=0.015,
)
test("3 horizon decisions", len(decisions) == 3)
test("Horizons are different",
len(set(d.horizon for d in decisions)) == 3,
f"{[d.horizon for d in decisions]}")
for d in decisions:
print(f" {d.horizon}: {d.signal.value} (conf={d.confidence:.1%}, dir={d.direction_prob:.1%})")
# Critical alert override
critical_decision = engine.make_decision(
market_features=test_features,
trader_profile={'cluster': 2, 'type_name': 'Aggressive'},
behavior_alerts={
'alerts': [{'type': 'REVENGE_TRADING', 'severity': 'CRITICAL', 'message': 'test'}],
'risk_multiplier': 0.3, 'status': 'critical'
},
current_atr=0.015,
horizon_idx=0,
)
test("Critical alert forces HOLD", critical_decision.signal == Signal.HOLD)
test("Alert in reasoning", any('CRITICAL' in r for r in critical_decision.reasoning))
# Test format_decision
formatted = format_decision(decision)
test("format_decision returns string", isinstance(formatted, str))
test("format_decision has signal", decision.signal.value in formatted)
test("format_decision has confidence", 'Confidence' in formatted)
# Test without model (defaults)
engine_no_model = DecisionEngine()
default_decision = engine_no_model.make_decision(
market_features=test_features,
current_atr=0.015,
)
test("Works without model (defaults)", isinstance(default_decision, TradingDecision))
print(f"\n 🎯 Decision engine: all signal types, alerts, formatting verified")
except Exception as e:
test("Decision engine module", False, f"EXCEPTION: {e}")
traceback.print_exc()
# ═══════════════════════════════════════════════════════
# TEST 7: EVALUATION & BACKTESTING
# ═══════════════════════════════════════════════════════
section("TEST 7: EVALUATION & BACKTESTING")
try:
from trading_intelligence.evaluation import Evaluator, format_evaluation
evaluator = Evaluator(prediction_horizons=[1, 5, 20], trading_costs=0.001)
test_loader = DataLoader(
TensorDataset(torch.FloatTensor(X_test), torch.FloatTensor(y_test)),
batch_size=128, shuffle=False
)
eval_results = evaluator.evaluate_predictions(model, test_loader, device)
test("Evaluation returns dict", isinstance(eval_results, dict))
test("Has summary", 'summary' in eval_results)
test("Has horizon_1", 'horizon_1' in eval_results)
test("Has horizon_5", 'horizon_5' in eval_results)
test("Has horizon_20", 'horizon_20' in eval_results)
summary = eval_results['summary']
test("num_test_samples > 0", summary['num_test_samples'] > 0)
test("avg_direction_accuracy in [0,1]", 0 <= summary['avg_direction_accuracy'] <= 1)
test("avg_ic is finite", np.isfinite(summary['avg_ic']))
for h in [1, 5, 20]:
hr = eval_results[f'horizon_{h}']
test(f"H{h} direction_accuracy in [0,1]", 0 <= hr['direction_accuracy'] <= 1)
test(f"H{h} sharpe_ratio is finite", np.isfinite(hr['sharpe_ratio']))
test(f"H{h} max_drawdown in [0,1]", 0 <= hr['max_drawdown'] <= 1)
test(f"H{h} profit_factor >= 0", hr['profit_factor'] >= 0)
test(f"H{h} win_rate in [0,1]", 0 <= hr['win_rate'] <= 1)
test(f"H{h} num_trades > 0", hr['num_trades'] > 0)
print(f" H{h}: DA={hr['direction_accuracy']:.1%} IC={hr['information_coefficient']:.4f} "
f"Sharpe={hr['sharpe_ratio']:.2f} DD={hr['max_drawdown']:.1%} PF={hr['profit_factor']:.2f}")
# Test format_evaluation
formatted = format_evaluation(eval_results)
test("format_evaluation returns string", isinstance(formatted, str))
test("format_evaluation has content", len(formatted) > 100)
print(f"\n πŸ“Š Evaluation: all metrics computed and validated")
except Exception as e:
test("Evaluation module", False, f"EXCEPTION: {e}")
traceback.print_exc()
# ═══════════════════════════════════════════════════════
# TEST 8: TRAINING PIPELINE (high-level API)
# ═══════════════════════════════════════════════════════
section("TEST 8: TRAINING PIPELINE (high-level)")
try:
from trading_intelligence.training import TrainingPipeline, FinancialTimeSeriesDataset
pipeline = TrainingPipeline(
lookback_window=30,
prediction_horizons=[1, 5, 20],
d_model=64, n_heads=4, n_layers=2, d_ff=128,
patch_len=6, stride=3, dropout=0.1,
learning_rate=1e-3, batch_size=128,
max_epochs=3, patience=2,
)
train_loader, val_loader, test_loader = pipeline.prepare_data(df)
test("Pipeline prepare_data", True)
test("Pipeline model initialized", pipeline.model is not None)
test("Pipeline loss_fn initialized", pipeline.loss_fn is not None)
results = pipeline.train(train_loader, val_loader)
test("Pipeline train completes", 'best_val_loss' in results)
test("Pipeline training history", len(results['history']) > 0)
# Save/load cycle
pipeline.save_model('/app/models/pipeline_model.pt')
test("Pipeline model saved", os.path.exists('/app/models/pipeline_model.pt'))
pipeline2 = TrainingPipeline(lookback_window=30, prediction_horizons=[1,5,20])
pipeline2.load_model('/app/models/pipeline_model.pt')
test("Pipeline model loaded", pipeline2.model is not None)
print(f"\n πŸ”§ Training pipeline: prepare, train, save, load all verified")
except Exception as e:
test("Training pipeline", False, f"EXCEPTION: {e}")
traceback.print_exc()
# ═══════════════════════════════════════════════════════
# FULL INTEGRATION TEST
# ═══════════════════════════════════════════════════════
section("INTEGRATION TEST: FULL PIPELINE")
try:
print(" Running complete end-to-end flow...")
# 1. Raw data β†’ Features
fe = FeatureEngine(lookback_window=30, prediction_horizons=[1, 5, 20])
features = fe.compute_all_features(df)
features_norm, _ = fe.normalize_features(features)
# 2. Features β†’ Sequences
target_cols = []
for h in [1, 5, 20]:
target_cols.extend([f'target_direction_{h}', f'target_return_{h}'])
X_all, y_all = fe.create_sequences(features_norm, target_cols=target_cols)
valid = np.isfinite(X_all).all(axis=(1, 2)) & np.isfinite(y_all).all(axis=1)
X_all, y_all = X_all[valid], y_all[valid]
# 3. Train model
model_final = TradingTransformer(
num_channels=X_all.shape[1], seq_len=30, patch_len=6, stride=3,
d_model=64, n_heads=4, n_layers=2, d_ff=128, num_horizons=3
)
# 4. Get prediction
model_final.eval()
with torch.no_grad():
sample = torch.FloatTensor(X_all[-1:])
pred = model_final.predict_with_confidence(sample)
# 5. Risk assessment
rm = RiskModel(market_dim=64, portfolio_dim=64, behavior_dim=64)
rm.eval()
# 6. Personalization
profiler = TraderProfiler()
alert_system = BehaviorAlertSystem()
pers = PersonalizationEngine()
sample_trades = [
{'entry_price': 100, 'exit_price': 101.5, 'size': 0.05, 'pnl': 75, 'holding_time': 120, 'direction': 1}
] * 15 + [
{'entry_price': 100, 'exit_price': 99, 'size': 0.05, 'pnl': -50, 'holding_time': 60, 'direction': -1}
] * 8
trader_feats = profiler.extract_behavior_features(sample_trades)
trader_profile = profiler.predict_type(trader_feats)
alerts = alert_system.analyze(sample_trades[-5:], 100000, 1.0)
# 7. Decision
decision_engine = DecisionEngine(
prediction_model=model_final,
personalization_engine=pers,
)
final_decision = decision_engine.make_decision(
market_features=X_all[-1:],
trader_profile=trader_profile,
behavior_alerts=alerts,
current_atr=0.015,
horizon_idx=1,
)
test("Integration: features computed", len(features) > 0)
test("Integration: sequences created", X_all.shape[0] > 0)
test("Integration: prediction made", pred['direction_probs'].shape == (1, 3))
test("Integration: trader profiled", trader_profile['type_name'] in TRADER_TYPES.values())
test("Integration: decision generated", isinstance(final_decision.signal, Signal))
print(f"\n Full pipeline output:")
print(format_decision(final_decision))
test("Integration: complete pipeline works", True)
except Exception as e:
test("Integration test", False, f"EXCEPTION: {e}")
traceback.print_exc()
# ═══════════════════════════════════════════════════════
# FINAL SUMMARY
# ═══════════════════════════════════════════════════════
section("TEST SUMMARY")
total = PASS + FAIL
print(f"\n βœ… Passed: {PASS}/{total}")
print(f" ❌ Failed: {FAIL}/{total}")
print(f" Pass Rate: {PASS/total*100:.1f}%")
if FAIL == 0:
print(f"\n πŸŽ‰ ALL TESTS PASSED!")
else:
print(f"\n ⚠️ {FAIL} test(s) need attention")
print(f"\n{'='*70}")