avinashhm's picture
Add run_quick.py
589b8a7 verified
#!/usr/bin/env python3
"""
Quick system test - lightweight version for CPU sandbox.
"""
import sys, os, time, json, warnings
warnings.filterwarnings('ignore')
# Force unbuffered output
sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', buffering=1)
import numpy as np
import pandas as pd
import torch
sys.path.insert(0, '/app')
print("=" * 70)
print(" AI-POWERED TRADING INTELLIGENCE SYSTEM v1.0")
print("=" * 70)
start = time.time()
# ═══════════════════════════════════════════
# 1. GENERATE DATA
# ═══════════════════════════════════════════
print("\n[1/5] Generating realistic financial data...")
np.random.seed(42)
num_days = 1500
dt = 1/252
prices = [150.0]
vol = 0.20
for i in range(num_days - 1):
vol = vol + 0.1 * (0.20 - vol) * dt + 0.3 * np.sqrt(dt) * np.random.normal()
vol = max(vol, 0.05)
ret = (0.08 - 0.5 * vol**2) * dt + vol * np.sqrt(dt) * np.random.normal()
prices.append(prices[-1] * np.exp(ret))
df = pd.DataFrame({
'date': pd.date_range('2019-01-02', periods=num_days, freq='B')[:num_days],
'open': [p * (1 + np.random.normal(0, 0.002)) for p in prices],
'high': [p * (1 + abs(np.random.normal(0, 0.01))) for p in prices],
'low': [p * (1 - abs(np.random.normal(0, 0.01))) for p in prices],
'close': prices,
'volume': [int(1e6 * np.exp(np.random.normal(0, 0.3))) for _ in range(num_days)],
})
# Fix OHLC consistency
df['high'] = df[['open', 'high', 'close']].max(axis=1) * (1 + abs(np.random.normal(0, 0.002, num_days)))
df['low'] = df[['open', 'low', 'close']].min(axis=1) * (1 - abs(np.random.normal(0, 0.002, num_days)))
print(f" Generated {num_days} days: ${prices[0]:.2f} -> ${prices[-1]:.2f}")
# ═══════════════════════════════════════════
# 2. FEATURE ENGINEERING
# ═══════════════════════════════════════════
print("\n[2/5] Computing features...")
from trading_intelligence.feature_engine import FeatureEngine
fe = FeatureEngine(lookback_window=30, prediction_horizons=[1, 5, 20])
features = fe.compute_all_features(df)
features_norm, norm_params = fe.normalize_features(features)
print(f" Features: {len(fe.feature_names)} channels")
print(f" Samples after windowing: {len(features)}")
# Create sequences
target_cols = []
for h in [1, 5, 20]:
target_cols.extend([f'target_direction_{h}', f'target_return_{h}'])
X, y = fe.create_sequences(features_norm, target_cols=target_cols)
valid = np.isfinite(X).all(axis=(1, 2)) & np.isfinite(y).all(axis=1)
X, y = X[valid], y[valid]
print(f" X shape: {X.shape}, y shape: {y.shape}")
# Split
n = len(X)
train_end = int(n * 0.7)
val_end = int(n * 0.85)
X_train, y_train = X[:train_end], y[:train_end]
X_val, y_val = X[train_end:val_end], y[train_end:val_end]
X_test, y_test = X[val_end:], y[val_end:]
print(f" Train: {len(X_train)}, Val: {len(X_val)}, Test: {len(X_test)}")
# ═══════════════════════════════════════════
# 3. MODEL TRAINING
# ═══════════════════════════════════════════
print("\n[3/5] Training prediction model...")
from trading_intelligence.prediction_model import TradingTransformer, MultiTaskLoss
from torch.utils.data import TensorDataset, DataLoader
device = torch.device('cpu')
num_channels = X.shape[1]
model = TradingTransformer(
num_channels=num_channels, seq_len=30, patch_len=6, stride=3,
d_model=64, n_heads=4, n_layers=2, d_ff=128,
num_horizons=3, dropout=0.1,
).to(device)
loss_fn = MultiTaskLoss(num_horizons=3).to(device)
params = sum(p.numel() for p in model.parameters())
print(f" Model: {params:,} parameters")
optimizer = torch.optim.AdamW(
list(model.parameters()) + list(loss_fn.parameters()),
lr=1e-3, weight_decay=1e-4
)
train_ds = TensorDataset(torch.FloatTensor(X_train), torch.FloatTensor(y_train))
val_ds = TensorDataset(torch.FloatTensor(X_val), torch.FloatTensor(y_val))
train_loader = DataLoader(train_ds, batch_size=128, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=128, shuffle=False)
best_val = float('inf')
best_state = None
for epoch in range(15):
model.train()
train_loss = 0
n_batch = 0
for xb, yb in train_loader:
xb, yb = xb.to(device), yb.to(device)
preds = model(xb)
directions = torch.stack([yb[:, i*2] for i in range(3)], dim=1)
returns = torch.stack([yb[:, i*2+1] for i in range(3)], dim=1)
losses = loss_fn(preds, {'direction': directions, 'returns': returns})
optimizer.zero_grad()
losses['total_loss'].backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
train_loss += losses['total_loss'].item()
n_batch += 1
# Validate
model.eval()
val_loss = 0
val_batches = 0
correct = np.zeros(3)
total = 0
with torch.no_grad():
for xb, yb in val_loader:
xb, yb = xb.to(device), yb.to(device)
preds = model(xb)
directions = torch.stack([yb[:, i*2] for i in range(3)], dim=1)
returns = torch.stack([yb[:, i*2+1] for i in range(3)], dim=1)
losses = loss_fn(preds, {'direction': directions, 'returns': returns})
val_loss += losses['total_loss'].item()
val_batches += 1
dir_preds = (torch.sigmoid(preds['direction_logits']) > 0.5).float()
for h in range(3):
correct[h] += (dir_preds[:, h] == directions[:, h]).sum().item()
total += len(xb)
tl = train_loss / max(n_batch, 1)
vl = val_loss / max(val_batches, 1)
accs = [correct[h] / max(total, 1) for h in range(3)]
print(f" Epoch {epoch+1:2d} | Train: {tl:.4f} | Val: {vl:.4f} | "
f"DA-1d: {accs[0]:.1%} | DA-5d: {accs[1]:.1%} | DA-20d: {accs[2]:.1%}")
if vl < best_val:
best_val = vl
best_state = {k: v.cpu().clone() for k, v in model.state_dict().items()}
if best_state:
model.load_state_dict(best_state)
model.to(device)
# Save
os.makedirs('/app/models', exist_ok=True)
torch.save({'model_state': model.state_dict(), 'config': {'num_channels': num_channels}},
'/app/models/TECH1_model.pt')
print(f" Best val loss: {best_val:.4f}")
# ═══════════════════════════════════════════
# 4. EVALUATION (BACKTEST)
# ═══════════════════════════════════════════
print("\n[4/5] Backtesting on test set...")
from trading_intelligence.evaluation import Evaluator, format_evaluation
evaluator = Evaluator(prediction_horizons=[1, 5, 20], trading_costs=0.001)
test_ds = TensorDataset(torch.FloatTensor(X_test), torch.FloatTensor(y_test))
test_loader = DataLoader(test_ds, batch_size=128, shuffle=False)
eval_results = evaluator.evaluate_predictions(model, test_loader, device)
print(format_evaluation(eval_results))
# ═══════════════════════════════════════════
# 5. RISK MODEL + PERSONALIZATION + DECISIONS
# ═══════════════════════════════════════════
print("\n[5/5] Risk Model + Personalization + Decision Engine...")
# Risk Model Demo
from trading_intelligence.risk_model import RiskModel
from trading_intelligence.personalization import TraderProfiler, BehaviorAlertSystem, PersonalizationEngine, TRADER_TYPES
from trading_intelligence.decision_engine import DecisionEngine, format_decision
risk_model = RiskModel(market_dim=64, portfolio_dim=64, behavior_dim=64)
risk_model.eval()
with torch.no_grad():
market_state = torch.randn(2, 64)
positions = torch.randn(2, 5, 8)
position_mask = torch.ones(2, 5, dtype=torch.bool)
position_mask[:, 3:] = False
account = torch.tensor([[100000, 10000, 0.05, 3, 0.3, 0.7],
[50000, 5000, 0.15, 3, 0.5, 0.5]], dtype=torch.float32)
trades = torch.randn(2, 20, 12)
risk_out = risk_model(market_state, positions, account, trades, position_mask)
print("\n RISK MODEL OUTPUTS:")
for i, label in enumerate(["Conservative Trader", "Aggressive Trader"]):
print(f"\n {label}:")
print(f" Risk Score: {risk_out['risk_score'][i]:.3f}")
print(f" Position Size: {risk_out['adjusted_position_size'][i]:.1%}")
print(f" SL ATR Multiple: {risk_out['stop_loss_atr_mult'][i]:.2f}")
print(f" TP ATR Multiple: {risk_out['take_profit_atr_mult'][i]:.2f}")
dd = risk_out['drawdown_probs'][i]
print(f" P(DD>5/10/15/20%): {dd[0]:.0%}/{dd[1]:.0%}/{dd[2]:.0%}/{dd[3]:.0%}")
beh = risk_out['behavior_profile']
print(f" Risk Appetite: {beh['risk_appetite'][i]:.3f}")
print(f" Overtrading: {beh['overtrading_prob'][i]:.0%}")
print(f" Revenge Trading: {beh['revenge_trading_prob'][i]:.0%}")
tt = torch.argmax(beh['trader_type_logits'][i]).item()
print(f" Trader Type: {TRADER_TYPES[tt]}")
# Personalization Demo
print("\n PERSONALIZATION:")
profiler = TraderProfiler()
alert_system = BehaviorAlertSystem()
personalization = PersonalizationEngine()
for name, trades_list, portfolio_val in [
("Conservative Carol",
[{'entry_price': 100, 'exit_price': 101, 'size': 0.01, 'pnl': 10, 'holding_time': 2880, 'direction': 1}] * 20 +
[{'entry_price': 100, 'exit_price': 99.5, 'size': 0.01, 'pnl': -5, 'holding_time': 1440, 'direction': 1}] * 8,
100000),
("Aggressive Alex",
[{'entry_price': 100, 'exit_price': 105, 'size': 0.15, 'pnl': 750, 'holding_time': 60, 'direction': 1}] * 12 +
[{'entry_price': 100, 'exit_price': 93, 'size': 0.20, 'pnl': -1400, 'holding_time': 30, 'direction': 1}] * 10,
50000),
("Scalper Sam",
[{'entry_price': 100, 'exit_price': 100.1, 'size': 0.03, 'pnl': 3, 'holding_time': 2, 'direction': 1}] * 80 +
[{'entry_price': 100, 'exit_price': 99.95, 'size': 0.03, 'pnl': -1.5, 'holding_time': 1, 'direction': -1}] * 50,
75000),
]:
feats = profiler.extract_behavior_features(trades_list)
profile = profiler.predict_type(feats)
alerts = alert_system.analyze(trades_list[-10:], portfolio_val, 1.0)
params = personalization.get_personalized_params(profile, alerts)
print(f"\n {name}: Type={profile['type_name']}, Win={profile['features']['win_rate']:.0%}, "
f"PF={profile['features']['profit_factor']:.1f}, Status={alerts['status'].upper()}")
print(f" -> Max Position: {params['max_position_pct']:.1%}, Min Confidence: {params['min_confidence']:.0%}")
for a in alerts['alerts']:
print(f" [{a['severity']}] {a['type']}")
# Decision Engine Demo
print("\n DECISION ENGINE:")
engine = DecisionEngine(prediction_model=model, personalization_engine=personalization)
market_feats = np.random.randn(1, num_channels, 30).astype(np.float32)
decisions = engine.make_multi_horizon_decisions(
market_features=market_feats,
trader_profile={'cluster': 1, 'type_name': 'Moderate'},
behavior_alerts={'alerts': [], 'risk_multiplier': 1.0, 'status': 'normal'},
current_atr=0.015,
)
for d in decisions:
print(format_decision(d))
# Decision with alert override
alert_decision = engine.make_decision(
market_features=market_feats,
trader_profile={'cluster': 2, 'type_name': 'Aggressive'},
behavior_alerts={
'alerts': [{'type': 'REVENGE_TRADING', 'severity': 'CRITICAL',
'message': 'Position size tripled after loss'}],
'risk_multiplier': 0.3, 'status': 'critical'
},
current_atr=0.015, horizon_idx=0,
)
print("\n WITH CRITICAL ALERT:")
print(format_decision(alert_decision))
# Save results
elapsed = time.time() - start
results_json = {
'eval_results': {k: v for k, v in eval_results.items()
if k != 'summary' or True},
'model_params': params,
'elapsed_seconds': elapsed,
}
# Clean for JSON serialization
def clean_for_json(obj):
if isinstance(obj, dict):
return {k: clean_for_json(v) for k, v in obj.items()
if k not in ['equity_curve', 'daily_returns']}
elif isinstance(obj, (np.floating, np.integer)):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
return obj
with open('/app/results_summary.json', 'w') as f:
json.dump(clean_for_json(results_json), f, indent=2, default=str)
print(f"\n{'='*70}")
print(f" COMPLETE in {elapsed:.1f}s")
print(f" Model saved: /app/models/TECH1_model.pt")
print(f" Results: /app/results_summary.json")
print(f"{'='*70}")