| |
| """ |
| Quick system test - lightweight version for CPU sandbox. |
| """ |
| import sys, os, time, json, warnings |
| warnings.filterwarnings('ignore') |
|
|
| |
| sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', buffering=1) |
|
|
| import numpy as np |
| import pandas as pd |
| import torch |
|
|
| sys.path.insert(0, '/app') |
|
|
| print("=" * 70) |
| print(" AI-POWERED TRADING INTELLIGENCE SYSTEM v1.0") |
| print("=" * 70) |
| start = time.time() |
|
|
| |
| |
| |
| print("\n[1/5] Generating realistic financial data...") |
|
|
| np.random.seed(42) |
| num_days = 1500 |
| dt = 1/252 |
| prices = [150.0] |
| vol = 0.20 |
|
|
| for i in range(num_days - 1): |
| vol = vol + 0.1 * (0.20 - vol) * dt + 0.3 * np.sqrt(dt) * np.random.normal() |
| vol = max(vol, 0.05) |
| ret = (0.08 - 0.5 * vol**2) * dt + vol * np.sqrt(dt) * np.random.normal() |
| prices.append(prices[-1] * np.exp(ret)) |
|
|
| df = pd.DataFrame({ |
| 'date': pd.date_range('2019-01-02', periods=num_days, freq='B')[:num_days], |
| 'open': [p * (1 + np.random.normal(0, 0.002)) for p in prices], |
| 'high': [p * (1 + abs(np.random.normal(0, 0.01))) for p in prices], |
| 'low': [p * (1 - abs(np.random.normal(0, 0.01))) for p in prices], |
| 'close': prices, |
| 'volume': [int(1e6 * np.exp(np.random.normal(0, 0.3))) for _ in range(num_days)], |
| }) |
| |
| df['high'] = df[['open', 'high', 'close']].max(axis=1) * (1 + abs(np.random.normal(0, 0.002, num_days))) |
| df['low'] = df[['open', 'low', 'close']].min(axis=1) * (1 - abs(np.random.normal(0, 0.002, num_days))) |
|
|
| print(f" Generated {num_days} days: ${prices[0]:.2f} -> ${prices[-1]:.2f}") |
|
|
| |
| |
| |
| print("\n[2/5] Computing features...") |
| from trading_intelligence.feature_engine import FeatureEngine |
|
|
| fe = FeatureEngine(lookback_window=30, prediction_horizons=[1, 5, 20]) |
| features = fe.compute_all_features(df) |
| features_norm, norm_params = fe.normalize_features(features) |
|
|
| print(f" Features: {len(fe.feature_names)} channels") |
| print(f" Samples after windowing: {len(features)}") |
|
|
| |
| target_cols = [] |
| for h in [1, 5, 20]: |
| target_cols.extend([f'target_direction_{h}', f'target_return_{h}']) |
|
|
| X, y = fe.create_sequences(features_norm, target_cols=target_cols) |
| valid = np.isfinite(X).all(axis=(1, 2)) & np.isfinite(y).all(axis=1) |
| X, y = X[valid], y[valid] |
|
|
| print(f" X shape: {X.shape}, y shape: {y.shape}") |
|
|
| |
| n = len(X) |
| train_end = int(n * 0.7) |
| val_end = int(n * 0.85) |
|
|
| X_train, y_train = X[:train_end], y[:train_end] |
| X_val, y_val = X[train_end:val_end], y[train_end:val_end] |
| X_test, y_test = X[val_end:], y[val_end:] |
|
|
| print(f" Train: {len(X_train)}, Val: {len(X_val)}, Test: {len(X_test)}") |
|
|
| |
| |
| |
| print("\n[3/5] Training prediction model...") |
| from trading_intelligence.prediction_model import TradingTransformer, MultiTaskLoss |
| from torch.utils.data import TensorDataset, DataLoader |
|
|
| device = torch.device('cpu') |
| num_channels = X.shape[1] |
|
|
| model = TradingTransformer( |
| num_channels=num_channels, seq_len=30, patch_len=6, stride=3, |
| d_model=64, n_heads=4, n_layers=2, d_ff=128, |
| num_horizons=3, dropout=0.1, |
| ).to(device) |
|
|
| loss_fn = MultiTaskLoss(num_horizons=3).to(device) |
| params = sum(p.numel() for p in model.parameters()) |
| print(f" Model: {params:,} parameters") |
|
|
| optimizer = torch.optim.AdamW( |
| list(model.parameters()) + list(loss_fn.parameters()), |
| lr=1e-3, weight_decay=1e-4 |
| ) |
|
|
| train_ds = TensorDataset(torch.FloatTensor(X_train), torch.FloatTensor(y_train)) |
| val_ds = TensorDataset(torch.FloatTensor(X_val), torch.FloatTensor(y_val)) |
| train_loader = DataLoader(train_ds, batch_size=128, shuffle=True) |
| val_loader = DataLoader(val_ds, batch_size=128, shuffle=False) |
|
|
| best_val = float('inf') |
| best_state = None |
|
|
| for epoch in range(15): |
| model.train() |
| train_loss = 0 |
| n_batch = 0 |
| for xb, yb in train_loader: |
| xb, yb = xb.to(device), yb.to(device) |
| preds = model(xb) |
| |
| directions = torch.stack([yb[:, i*2] for i in range(3)], dim=1) |
| returns = torch.stack([yb[:, i*2+1] for i in range(3)], dim=1) |
| |
| losses = loss_fn(preds, {'direction': directions, 'returns': returns}) |
| optimizer.zero_grad() |
| losses['total_loss'].backward() |
| torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) |
| optimizer.step() |
| train_loss += losses['total_loss'].item() |
| n_batch += 1 |
| |
| |
| model.eval() |
| val_loss = 0 |
| val_batches = 0 |
| correct = np.zeros(3) |
| total = 0 |
| |
| with torch.no_grad(): |
| for xb, yb in val_loader: |
| xb, yb = xb.to(device), yb.to(device) |
| preds = model(xb) |
| directions = torch.stack([yb[:, i*2] for i in range(3)], dim=1) |
| returns = torch.stack([yb[:, i*2+1] for i in range(3)], dim=1) |
| losses = loss_fn(preds, {'direction': directions, 'returns': returns}) |
| val_loss += losses['total_loss'].item() |
| val_batches += 1 |
| |
| dir_preds = (torch.sigmoid(preds['direction_logits']) > 0.5).float() |
| for h in range(3): |
| correct[h] += (dir_preds[:, h] == directions[:, h]).sum().item() |
| total += len(xb) |
| |
| tl = train_loss / max(n_batch, 1) |
| vl = val_loss / max(val_batches, 1) |
| accs = [correct[h] / max(total, 1) for h in range(3)] |
| |
| print(f" Epoch {epoch+1:2d} | Train: {tl:.4f} | Val: {vl:.4f} | " |
| f"DA-1d: {accs[0]:.1%} | DA-5d: {accs[1]:.1%} | DA-20d: {accs[2]:.1%}") |
| |
| if vl < best_val: |
| best_val = vl |
| best_state = {k: v.cpu().clone() for k, v in model.state_dict().items()} |
|
|
| if best_state: |
| model.load_state_dict(best_state) |
| model.to(device) |
|
|
| |
| os.makedirs('/app/models', exist_ok=True) |
| torch.save({'model_state': model.state_dict(), 'config': {'num_channels': num_channels}}, |
| '/app/models/TECH1_model.pt') |
| print(f" Best val loss: {best_val:.4f}") |
|
|
| |
| |
| |
| print("\n[4/5] Backtesting on test set...") |
| from trading_intelligence.evaluation import Evaluator, format_evaluation |
|
|
| evaluator = Evaluator(prediction_horizons=[1, 5, 20], trading_costs=0.001) |
| test_ds = TensorDataset(torch.FloatTensor(X_test), torch.FloatTensor(y_test)) |
| test_loader = DataLoader(test_ds, batch_size=128, shuffle=False) |
|
|
| eval_results = evaluator.evaluate_predictions(model, test_loader, device) |
| print(format_evaluation(eval_results)) |
|
|
| |
| |
| |
| print("\n[5/5] Risk Model + Personalization + Decision Engine...") |
|
|
| |
| from trading_intelligence.risk_model import RiskModel |
| from trading_intelligence.personalization import TraderProfiler, BehaviorAlertSystem, PersonalizationEngine, TRADER_TYPES |
| from trading_intelligence.decision_engine import DecisionEngine, format_decision |
|
|
| risk_model = RiskModel(market_dim=64, portfolio_dim=64, behavior_dim=64) |
| risk_model.eval() |
|
|
| with torch.no_grad(): |
| market_state = torch.randn(2, 64) |
| positions = torch.randn(2, 5, 8) |
| position_mask = torch.ones(2, 5, dtype=torch.bool) |
| position_mask[:, 3:] = False |
| account = torch.tensor([[100000, 10000, 0.05, 3, 0.3, 0.7], |
| [50000, 5000, 0.15, 3, 0.5, 0.5]], dtype=torch.float32) |
| trades = torch.randn(2, 20, 12) |
| |
| risk_out = risk_model(market_state, positions, account, trades, position_mask) |
|
|
| print("\n RISK MODEL OUTPUTS:") |
| for i, label in enumerate(["Conservative Trader", "Aggressive Trader"]): |
| print(f"\n {label}:") |
| print(f" Risk Score: {risk_out['risk_score'][i]:.3f}") |
| print(f" Position Size: {risk_out['adjusted_position_size'][i]:.1%}") |
| print(f" SL ATR Multiple: {risk_out['stop_loss_atr_mult'][i]:.2f}") |
| print(f" TP ATR Multiple: {risk_out['take_profit_atr_mult'][i]:.2f}") |
| dd = risk_out['drawdown_probs'][i] |
| print(f" P(DD>5/10/15/20%): {dd[0]:.0%}/{dd[1]:.0%}/{dd[2]:.0%}/{dd[3]:.0%}") |
| beh = risk_out['behavior_profile'] |
| print(f" Risk Appetite: {beh['risk_appetite'][i]:.3f}") |
| print(f" Overtrading: {beh['overtrading_prob'][i]:.0%}") |
| print(f" Revenge Trading: {beh['revenge_trading_prob'][i]:.0%}") |
| tt = torch.argmax(beh['trader_type_logits'][i]).item() |
| print(f" Trader Type: {TRADER_TYPES[tt]}") |
|
|
| |
| print("\n PERSONALIZATION:") |
| profiler = TraderProfiler() |
| alert_system = BehaviorAlertSystem() |
| personalization = PersonalizationEngine() |
|
|
| for name, trades_list, portfolio_val in [ |
| ("Conservative Carol", |
| [{'entry_price': 100, 'exit_price': 101, 'size': 0.01, 'pnl': 10, 'holding_time': 2880, 'direction': 1}] * 20 + |
| [{'entry_price': 100, 'exit_price': 99.5, 'size': 0.01, 'pnl': -5, 'holding_time': 1440, 'direction': 1}] * 8, |
| 100000), |
| ("Aggressive Alex", |
| [{'entry_price': 100, 'exit_price': 105, 'size': 0.15, 'pnl': 750, 'holding_time': 60, 'direction': 1}] * 12 + |
| [{'entry_price': 100, 'exit_price': 93, 'size': 0.20, 'pnl': -1400, 'holding_time': 30, 'direction': 1}] * 10, |
| 50000), |
| ("Scalper Sam", |
| [{'entry_price': 100, 'exit_price': 100.1, 'size': 0.03, 'pnl': 3, 'holding_time': 2, 'direction': 1}] * 80 + |
| [{'entry_price': 100, 'exit_price': 99.95, 'size': 0.03, 'pnl': -1.5, 'holding_time': 1, 'direction': -1}] * 50, |
| 75000), |
| ]: |
| feats = profiler.extract_behavior_features(trades_list) |
| profile = profiler.predict_type(feats) |
| alerts = alert_system.analyze(trades_list[-10:], portfolio_val, 1.0) |
| params = personalization.get_personalized_params(profile, alerts) |
| |
| print(f"\n {name}: Type={profile['type_name']}, Win={profile['features']['win_rate']:.0%}, " |
| f"PF={profile['features']['profit_factor']:.1f}, Status={alerts['status'].upper()}") |
| print(f" -> Max Position: {params['max_position_pct']:.1%}, Min Confidence: {params['min_confidence']:.0%}") |
| for a in alerts['alerts']: |
| print(f" [{a['severity']}] {a['type']}") |
|
|
| |
| print("\n DECISION ENGINE:") |
| engine = DecisionEngine(prediction_model=model, personalization_engine=personalization) |
|
|
| market_feats = np.random.randn(1, num_channels, 30).astype(np.float32) |
| decisions = engine.make_multi_horizon_decisions( |
| market_features=market_feats, |
| trader_profile={'cluster': 1, 'type_name': 'Moderate'}, |
| behavior_alerts={'alerts': [], 'risk_multiplier': 1.0, 'status': 'normal'}, |
| current_atr=0.015, |
| ) |
|
|
| for d in decisions: |
| print(format_decision(d)) |
|
|
| |
| alert_decision = engine.make_decision( |
| market_features=market_feats, |
| trader_profile={'cluster': 2, 'type_name': 'Aggressive'}, |
| behavior_alerts={ |
| 'alerts': [{'type': 'REVENGE_TRADING', 'severity': 'CRITICAL', |
| 'message': 'Position size tripled after loss'}], |
| 'risk_multiplier': 0.3, 'status': 'critical' |
| }, |
| current_atr=0.015, horizon_idx=0, |
| ) |
| print("\n WITH CRITICAL ALERT:") |
| print(format_decision(alert_decision)) |
|
|
| |
| elapsed = time.time() - start |
| results_json = { |
| 'eval_results': {k: v for k, v in eval_results.items() |
| if k != 'summary' or True}, |
| 'model_params': params, |
| 'elapsed_seconds': elapsed, |
| } |
| |
| def clean_for_json(obj): |
| if isinstance(obj, dict): |
| return {k: clean_for_json(v) for k, v in obj.items() |
| if k not in ['equity_curve', 'daily_returns']} |
| elif isinstance(obj, (np.floating, np.integer)): |
| return float(obj) |
| elif isinstance(obj, np.ndarray): |
| return obj.tolist() |
| return obj |
|
|
| with open('/app/results_summary.json', 'w') as f: |
| json.dump(clean_for_json(results_json), f, indent=2, default=str) |
|
|
| print(f"\n{'='*70}") |
| print(f" COMPLETE in {elapsed:.1f}s") |
| print(f" Model saved: /app/models/TECH1_model.pt") |
| print(f" Results: /app/results_summary.json") |
| print(f"{'='*70}") |
|
|