#!/usr/bin/env python3 """ Quick system test - lightweight version for CPU sandbox. """ import sys, os, time, json, warnings warnings.filterwarnings('ignore') # Force unbuffered output sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', buffering=1) import numpy as np import pandas as pd import torch sys.path.insert(0, '/app') print("=" * 70) print(" AI-POWERED TRADING INTELLIGENCE SYSTEM v1.0") print("=" * 70) start = time.time() # ═══════════════════════════════════════════ # 1. GENERATE DATA # ═══════════════════════════════════════════ print("\n[1/5] Generating realistic financial data...") np.random.seed(42) num_days = 1500 dt = 1/252 prices = [150.0] vol = 0.20 for i in range(num_days - 1): vol = vol + 0.1 * (0.20 - vol) * dt + 0.3 * np.sqrt(dt) * np.random.normal() vol = max(vol, 0.05) ret = (0.08 - 0.5 * vol**2) * dt + vol * np.sqrt(dt) * np.random.normal() prices.append(prices[-1] * np.exp(ret)) df = pd.DataFrame({ 'date': pd.date_range('2019-01-02', periods=num_days, freq='B')[:num_days], 'open': [p * (1 + np.random.normal(0, 0.002)) for p in prices], 'high': [p * (1 + abs(np.random.normal(0, 0.01))) for p in prices], 'low': [p * (1 - abs(np.random.normal(0, 0.01))) for p in prices], 'close': prices, 'volume': [int(1e6 * np.exp(np.random.normal(0, 0.3))) for _ in range(num_days)], }) # Fix OHLC consistency df['high'] = df[['open', 'high', 'close']].max(axis=1) * (1 + abs(np.random.normal(0, 0.002, num_days))) df['low'] = df[['open', 'low', 'close']].min(axis=1) * (1 - abs(np.random.normal(0, 0.002, num_days))) print(f" Generated {num_days} days: ${prices[0]:.2f} -> ${prices[-1]:.2f}") # ═══════════════════════════════════════════ # 2. FEATURE ENGINEERING # ═══════════════════════════════════════════ print("\n[2/5] Computing features...") from trading_intelligence.feature_engine import FeatureEngine fe = FeatureEngine(lookback_window=30, prediction_horizons=[1, 5, 20]) features = fe.compute_all_features(df) features_norm, norm_params = fe.normalize_features(features) print(f" Features: {len(fe.feature_names)} channels") print(f" Samples after windowing: {len(features)}") # Create sequences target_cols = [] for h in [1, 5, 20]: target_cols.extend([f'target_direction_{h}', f'target_return_{h}']) X, y = fe.create_sequences(features_norm, target_cols=target_cols) valid = np.isfinite(X).all(axis=(1, 2)) & np.isfinite(y).all(axis=1) X, y = X[valid], y[valid] print(f" X shape: {X.shape}, y shape: {y.shape}") # Split n = len(X) train_end = int(n * 0.7) val_end = int(n * 0.85) X_train, y_train = X[:train_end], y[:train_end] X_val, y_val = X[train_end:val_end], y[train_end:val_end] X_test, y_test = X[val_end:], y[val_end:] print(f" Train: {len(X_train)}, Val: {len(X_val)}, Test: {len(X_test)}") # ═══════════════════════════════════════════ # 3. MODEL TRAINING # ═══════════════════════════════════════════ print("\n[3/5] Training prediction model...") from trading_intelligence.prediction_model import TradingTransformer, MultiTaskLoss from torch.utils.data import TensorDataset, DataLoader device = torch.device('cpu') num_channels = X.shape[1] model = TradingTransformer( num_channels=num_channels, seq_len=30, patch_len=6, stride=3, d_model=64, n_heads=4, n_layers=2, d_ff=128, num_horizons=3, dropout=0.1, ).to(device) loss_fn = MultiTaskLoss(num_horizons=3).to(device) params = sum(p.numel() for p in model.parameters()) print(f" Model: {params:,} parameters") optimizer = torch.optim.AdamW( list(model.parameters()) + list(loss_fn.parameters()), lr=1e-3, weight_decay=1e-4 ) train_ds = TensorDataset(torch.FloatTensor(X_train), torch.FloatTensor(y_train)) val_ds = TensorDataset(torch.FloatTensor(X_val), torch.FloatTensor(y_val)) train_loader = DataLoader(train_ds, batch_size=128, shuffle=True) val_loader = DataLoader(val_ds, batch_size=128, shuffle=False) best_val = float('inf') best_state = None for epoch in range(15): model.train() train_loss = 0 n_batch = 0 for xb, yb in train_loader: xb, yb = xb.to(device), yb.to(device) preds = model(xb) directions = torch.stack([yb[:, i*2] for i in range(3)], dim=1) returns = torch.stack([yb[:, i*2+1] for i in range(3)], dim=1) losses = loss_fn(preds, {'direction': directions, 'returns': returns}) optimizer.zero_grad() losses['total_loss'].backward() torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) optimizer.step() train_loss += losses['total_loss'].item() n_batch += 1 # Validate model.eval() val_loss = 0 val_batches = 0 correct = np.zeros(3) total = 0 with torch.no_grad(): for xb, yb in val_loader: xb, yb = xb.to(device), yb.to(device) preds = model(xb) directions = torch.stack([yb[:, i*2] for i in range(3)], dim=1) returns = torch.stack([yb[:, i*2+1] for i in range(3)], dim=1) losses = loss_fn(preds, {'direction': directions, 'returns': returns}) val_loss += losses['total_loss'].item() val_batches += 1 dir_preds = (torch.sigmoid(preds['direction_logits']) > 0.5).float() for h in range(3): correct[h] += (dir_preds[:, h] == directions[:, h]).sum().item() total += len(xb) tl = train_loss / max(n_batch, 1) vl = val_loss / max(val_batches, 1) accs = [correct[h] / max(total, 1) for h in range(3)] print(f" Epoch {epoch+1:2d} | Train: {tl:.4f} | Val: {vl:.4f} | " f"DA-1d: {accs[0]:.1%} | DA-5d: {accs[1]:.1%} | DA-20d: {accs[2]:.1%}") if vl < best_val: best_val = vl best_state = {k: v.cpu().clone() for k, v in model.state_dict().items()} if best_state: model.load_state_dict(best_state) model.to(device) # Save os.makedirs('/app/models', exist_ok=True) torch.save({'model_state': model.state_dict(), 'config': {'num_channels': num_channels}}, '/app/models/TECH1_model.pt') print(f" Best val loss: {best_val:.4f}") # ═══════════════════════════════════════════ # 4. EVALUATION (BACKTEST) # ═══════════════════════════════════════════ print("\n[4/5] Backtesting on test set...") from trading_intelligence.evaluation import Evaluator, format_evaluation evaluator = Evaluator(prediction_horizons=[1, 5, 20], trading_costs=0.001) test_ds = TensorDataset(torch.FloatTensor(X_test), torch.FloatTensor(y_test)) test_loader = DataLoader(test_ds, batch_size=128, shuffle=False) eval_results = evaluator.evaluate_predictions(model, test_loader, device) print(format_evaluation(eval_results)) # ═══════════════════════════════════════════ # 5. RISK MODEL + PERSONALIZATION + DECISIONS # ═══════════════════════════════════════════ print("\n[5/5] Risk Model + Personalization + Decision Engine...") # Risk Model Demo from trading_intelligence.risk_model import RiskModel from trading_intelligence.personalization import TraderProfiler, BehaviorAlertSystem, PersonalizationEngine, TRADER_TYPES from trading_intelligence.decision_engine import DecisionEngine, format_decision risk_model = RiskModel(market_dim=64, portfolio_dim=64, behavior_dim=64) risk_model.eval() with torch.no_grad(): market_state = torch.randn(2, 64) positions = torch.randn(2, 5, 8) position_mask = torch.ones(2, 5, dtype=torch.bool) position_mask[:, 3:] = False account = torch.tensor([[100000, 10000, 0.05, 3, 0.3, 0.7], [50000, 5000, 0.15, 3, 0.5, 0.5]], dtype=torch.float32) trades = torch.randn(2, 20, 12) risk_out = risk_model(market_state, positions, account, trades, position_mask) print("\n RISK MODEL OUTPUTS:") for i, label in enumerate(["Conservative Trader", "Aggressive Trader"]): print(f"\n {label}:") print(f" Risk Score: {risk_out['risk_score'][i]:.3f}") print(f" Position Size: {risk_out['adjusted_position_size'][i]:.1%}") print(f" SL ATR Multiple: {risk_out['stop_loss_atr_mult'][i]:.2f}") print(f" TP ATR Multiple: {risk_out['take_profit_atr_mult'][i]:.2f}") dd = risk_out['drawdown_probs'][i] print(f" P(DD>5/10/15/20%): {dd[0]:.0%}/{dd[1]:.0%}/{dd[2]:.0%}/{dd[3]:.0%}") beh = risk_out['behavior_profile'] print(f" Risk Appetite: {beh['risk_appetite'][i]:.3f}") print(f" Overtrading: {beh['overtrading_prob'][i]:.0%}") print(f" Revenge Trading: {beh['revenge_trading_prob'][i]:.0%}") tt = torch.argmax(beh['trader_type_logits'][i]).item() print(f" Trader Type: {TRADER_TYPES[tt]}") # Personalization Demo print("\n PERSONALIZATION:") profiler = TraderProfiler() alert_system = BehaviorAlertSystem() personalization = PersonalizationEngine() for name, trades_list, portfolio_val in [ ("Conservative Carol", [{'entry_price': 100, 'exit_price': 101, 'size': 0.01, 'pnl': 10, 'holding_time': 2880, 'direction': 1}] * 20 + [{'entry_price': 100, 'exit_price': 99.5, 'size': 0.01, 'pnl': -5, 'holding_time': 1440, 'direction': 1}] * 8, 100000), ("Aggressive Alex", [{'entry_price': 100, 'exit_price': 105, 'size': 0.15, 'pnl': 750, 'holding_time': 60, 'direction': 1}] * 12 + [{'entry_price': 100, 'exit_price': 93, 'size': 0.20, 'pnl': -1400, 'holding_time': 30, 'direction': 1}] * 10, 50000), ("Scalper Sam", [{'entry_price': 100, 'exit_price': 100.1, 'size': 0.03, 'pnl': 3, 'holding_time': 2, 'direction': 1}] * 80 + [{'entry_price': 100, 'exit_price': 99.95, 'size': 0.03, 'pnl': -1.5, 'holding_time': 1, 'direction': -1}] * 50, 75000), ]: feats = profiler.extract_behavior_features(trades_list) profile = profiler.predict_type(feats) alerts = alert_system.analyze(trades_list[-10:], portfolio_val, 1.0) params = personalization.get_personalized_params(profile, alerts) print(f"\n {name}: Type={profile['type_name']}, Win={profile['features']['win_rate']:.0%}, " f"PF={profile['features']['profit_factor']:.1f}, Status={alerts['status'].upper()}") print(f" -> Max Position: {params['max_position_pct']:.1%}, Min Confidence: {params['min_confidence']:.0%}") for a in alerts['alerts']: print(f" [{a['severity']}] {a['type']}") # Decision Engine Demo print("\n DECISION ENGINE:") engine = DecisionEngine(prediction_model=model, personalization_engine=personalization) market_feats = np.random.randn(1, num_channels, 30).astype(np.float32) decisions = engine.make_multi_horizon_decisions( market_features=market_feats, trader_profile={'cluster': 1, 'type_name': 'Moderate'}, behavior_alerts={'alerts': [], 'risk_multiplier': 1.0, 'status': 'normal'}, current_atr=0.015, ) for d in decisions: print(format_decision(d)) # Decision with alert override alert_decision = engine.make_decision( market_features=market_feats, trader_profile={'cluster': 2, 'type_name': 'Aggressive'}, behavior_alerts={ 'alerts': [{'type': 'REVENGE_TRADING', 'severity': 'CRITICAL', 'message': 'Position size tripled after loss'}], 'risk_multiplier': 0.3, 'status': 'critical' }, current_atr=0.015, horizon_idx=0, ) print("\n WITH CRITICAL ALERT:") print(format_decision(alert_decision)) # Save results elapsed = time.time() - start results_json = { 'eval_results': {k: v for k, v in eval_results.items() if k != 'summary' or True}, 'model_params': params, 'elapsed_seconds': elapsed, } # Clean for JSON serialization def clean_for_json(obj): if isinstance(obj, dict): return {k: clean_for_json(v) for k, v in obj.items() if k not in ['equity_curve', 'daily_returns']} elif isinstance(obj, (np.floating, np.integer)): return float(obj) elif isinstance(obj, np.ndarray): return obj.tolist() return obj with open('/app/results_summary.json', 'w') as f: json.dump(clean_for_json(results_json), f, indent=2, default=str) print(f"\n{'='*70}") print(f" COMPLETE in {elapsed:.1f}s") print(f" Model saved: /app/models/TECH1_model.pt") print(f" Results: /app/results_summary.json") print(f"{'='*70}")