Spaces:
Sleeping
Sleeping
File size: 5,058 Bytes
7835c9e 5a78d94 7835c9e 1f5a715 7835c9e 1f5a715 7835c9e 1f5a715 7835c9e 5a78d94 1f5a715 5a78d94 1f5a715 5a78d94 7835c9e 5a78d94 1f5a715 5a78d94 1f5a715 5a78d94 1f5a715 5a78d94 7835c9e 1f5a715 7835c9e 1f5a715 7835c9e 1f5a715 7835c9e 1f5a715 7835c9e 1f5a715 7835c9e 1f5a715 7835c9e 1f5a715 7835c9e 1f5a715 7835c9e 1f5a715 7835c9e 1f5a715 7835c9e 1f5a715 7835c9e 5a78d94 7835c9e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 | import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from collections import deque
import random
class VisualTradingAgent:
def __init__(self, state_dim, action_dim, learning_rate=0.001):
self.state_dim = state_dim
self.action_dim = action_dim
self.learning_rate = learning_rate
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {self.device}")
# Neural network - simplified for stability
self.policy_net = SimpleTradingNetwork(state_dim, action_dim).to(self.device)
self.optimizer = optim.Adam(self.policy_net.parameters(), lr=learning_rate)
# Experience replay
self.memory = deque(maxlen=500) # Smaller memory for stability
self.batch_size = 16
# Training parameters
self.gamma = 0.99
self.epsilon = 1.0
self.epsilon_min = 0.1
self.epsilon_decay = 0.995
def select_action(self, state):
"""Select action using epsilon-greedy policy"""
if random.random() < self.epsilon:
return random.randint(0, self.action_dim - 1)
try:
# Normalize state and convert to tensor
state_normalized = state.astype(np.float32) / 255.0
state_tensor = torch.FloatTensor(state_normalized).unsqueeze(0).to(self.device)
with torch.no_grad():
q_values = self.policy_net(state_tensor)
return q_values.argmax().item()
except Exception as e:
print(f"Error in action selection: {e}")
return random.randint(0, self.action_dim - 1)
def store_transition(self, state, action, reward, next_state, done):
"""Store experience in replay memory"""
self.memory.append((state, action, reward, next_state, done))
def update(self):
"""Update the neural network"""
if len(self.memory) < self.batch_size:
return 0
try:
# Sample batch from memory
batch = random.sample(self.memory, self.batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
# Convert to tensors with normalization
states = torch.FloatTensor(np.array(states)).to(self.device) / 255.0
actions = torch.LongTensor(actions).to(self.device)
rewards = torch.FloatTensor(rewards).to(self.device)
next_states = torch.FloatTensor(np.array(next_states)).to(self.device) / 255.0
dones = torch.BoolTensor(dones).to(self.device)
# Current Q values
current_q = self.policy_net(states).gather(1, actions.unsqueeze(1))
# Next Q values
with torch.no_grad():
next_q = self.policy_net(next_states).max(1)[0]
target_q = rewards + (self.gamma * next_q * ~dones)
# Compute loss
loss = nn.MSELoss()(current_q.squeeze(), target_q)
# Optimize
self.optimizer.zero_grad()
loss.backward()
# Gradient clipping for stability
torch.nn.utils.clip_grad_norm_(self.policy_net.parameters(), 1.0)
self.optimizer.step()
# Decay epsilon
self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
return loss.item()
except Exception as e:
print(f"Error in update: {e}")
return 0
class SimpleTradingNetwork(nn.Module):
def __init__(self, state_dim, action_dim):
super(SimpleTradingNetwork, self).__init__()
# Simplified CNN for faster training
self.conv_layers = nn.Sequential(
nn.Conv2d(4, 16, kernel_size=4, stride=2), # Input: 84x84x4
nn.ReLU(),
nn.Conv2d(16, 32, kernel_size=4, stride=2), # 41x41x16 -> 19x19x32
nn.ReLU(),
nn.Conv2d(32, 32, kernel_size=3, stride=1), # 19x19x32 -> 17x17x32
nn.ReLU(),
nn.AdaptiveAvgPool2d((8, 8)) # 17x17x32 -> 8x8x32
)
# Calculate flattened size
self.flattened_size = 32 * 8 * 8
# Fully connected layers
self.fc_layers = nn.Sequential(
nn.Linear(self.flattened_size, 128),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(128, 64),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(64, action_dim)
)
def forward(self, x):
# x shape: (batch_size, 84, 84, 4) -> (batch_size, 4, 84, 84)
if len(x.shape) == 4: # Single observation
x = x.permute(0, 3, 1, 2)
else: # Batch of observations
x = x.permute(0, 3, 1, 2)
x = self.conv_layers(x)
x = x.view(x.size(0), -1)
x = self.fc_layers(x)
return x |