| |
| """ |
| Implements a Deep Q-Network (DQN) Reinforcement Learning agent for Tensorus. |
| |
| The agent interacts with an environment, stores experiences (S, A, R, S', D) |
| in TensorStorage, samples experiences, and trains its Q-network. |
| |
| Note on Experience Storage: |
| - Large tensors (state, next_state) are stored individually in a 'rl_states' dataset. |
| - Experience tuples containing IDs of states/next_states and scalar action/reward/done |
| are stored as metadata in a placeholder tensor within the 'rl_experiences' dataset. |
| - This approach balances tensor-native storage with manageable metadata, but sampling |
| requires retrieving linked state tensors, which might be slow depending on storage backend. |
| """ |
| from typing import Any |
| import torch |
| import torch.nn as nn |
| import torch.optim as optim |
| import torch.nn.functional as F |
| import random |
| import math |
| import logging |
| from typing import Tuple, Optional, Dict, Any |
|
|
| |
| from tensor_storage import TensorStorage |
| from dummy_env import DummyEnv |
|
|
| |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| class DQN(nn.Module): |
| """Simple MLP Q-Network.""" |
| def __init__(self, n_observations: int, n_actions: int, hidden_size: int = 128): |
| super(DQN, self).__init__() |
| self.layer1 = nn.Linear(n_observations, hidden_size) |
| self.layer2 = nn.Linear(hidden_size, hidden_size) |
| self.layer3 = nn.Linear(hidden_size, n_actions) |
|
|
| def forward(self, x: torch.Tensor) -> torch.Tensor: |
| """Forward pass: returns Q-values for each action.""" |
| |
| if x.dtype != torch.float32: |
| x = x.float() |
| x = F.relu(self.layer1(x)) |
| x = F.relu(self.layer2(x)) |
| return self.layer3(x) |
|
|
|
|
| |
| class RLAgent: |
| """DQN Agent interacting with TensorStorage.""" |
|
|
| def __init__(self, |
| tensor_storage: TensorStorage, |
| state_dim: int, |
| action_dim: int, |
| hidden_size: int = 128, |
| lr: float = 1e-4, |
| gamma: float = 0.99, |
| epsilon_start: float = 0.9, |
| epsilon_end: float = 0.05, |
| epsilon_decay: int = 10000, |
| target_update_freq: int = 500, |
| batch_size: int = 128, |
| experience_dataset: str = "rl_experiences", |
| state_dataset: str = "rl_states"): |
| """ |
| Initializes the RL Agent. |
| |
| Args: |
| tensor_storage: Instance of TensorStorage. |
| state_dim: Dimensionality of the environment state. |
| action_dim: Number of discrete actions. |
| hidden_size: Hidden layer size for the DQN. |
| lr: Learning rate for the optimizer. |
| gamma: Discount factor for future rewards. |
| epsilon_*: Epsilon-greedy exploration parameters. |
| target_update_freq: How often (in steps) to update the target network. |
| batch_size: Number of experiences to sample for training. |
| experience_dataset: Name of the dataset to store experience metadata. |
| state_dataset: Name of the dataset to store state/next_state tensors. |
| """ |
| if not isinstance(tensor_storage, TensorStorage): |
| raise TypeError("tensor_storage must be an instance of TensorStorage") |
|
|
| self.tensor_storage = tensor_storage |
| self.state_dim = state_dim |
| self.action_dim = action_dim |
| self.gamma = gamma |
| self.batch_size = batch_size |
| self.epsilon_start = epsilon_start |
| self.epsilon_end = epsilon_end |
| self.epsilon_decay = epsilon_decay |
| self.target_update_freq = target_update_freq |
|
|
| self.experience_dataset = experience_dataset |
| self.state_dataset = state_dataset |
|
|
| |
| for ds_name in [self.experience_dataset, self.state_dataset]: |
| try: |
| self.tensor_storage.get_dataset(ds_name) |
| except ValueError: |
| logger.info(f"Dataset '{ds_name}' not found. Creating it.") |
| self.tensor_storage.create_dataset(ds_name) |
|
|
| |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| logger.info(f"RL Agent using device: {self.device}") |
|
|
| |
| self.policy_net = DQN(state_dim, action_dim, hidden_size).to(self.device) |
| self.target_net = DQN(state_dim, action_dim, hidden_size).to(self.device) |
| self.target_net.load_state_dict(self.policy_net.state_dict()) |
| self.target_net.eval() |
|
|
| |
| self.optimizer = optim.AdamW(self.policy_net.parameters(), lr=lr, amsgrad=True) |
|
|
| self.steps_done = 0 |
|
|
|
|
| def select_action(self, state: torch.Tensor) -> int: |
| """Selects an action using epsilon-greedy strategy.""" |
| sample = random.random() |
| |
| eps_threshold = self.epsilon_end + (self.epsilon_start - self.epsilon_end) * \ |
| math.exp(-1. * self.steps_done / self.epsilon_decay) |
|
|
| self.steps_done += 1 |
|
|
| if sample > eps_threshold: |
| |
| with torch.no_grad(): |
| |
| state = state.unsqueeze(0).to(self.device) if state.ndim == 1 else state.to(self.device) |
| q_values = self.policy_net(state) |
| action = q_values.max(1)[1].view(1, 1).item() |
| logger.debug(f"Exploiting: Q-Values={q_values.cpu().numpy()}, Chosen Action={action}") |
| return action |
| else: |
| |
| action = random.randrange(self.action_dim) |
| logger.debug(f"Exploring: Chosen Action={action}") |
| return action |
|
|
|
|
| def store_experience(self, state: torch.Tensor, action: int, reward: float, next_state: Optional[torch.Tensor], done: bool) -> None: |
| """Stores an experience tuple in TensorStorage.""" |
| if state is None: |
| logger.error("Cannot store experience with None state.") |
| return |
|
|
| |
| state_id = self.tensor_storage.insert(self.state_dataset, state.cpu(), metadata={"component": "state"}) |
|
|
| |
| next_state_id = None |
| if next_state is not None: |
| next_state_id = self.tensor_storage.insert(self.state_dataset, next_state.cpu(), metadata={"component": "next_state"}) |
|
|
| |
| experience_metadata = { |
| "state_id": state_id, |
| "action": action, |
| "reward": reward, |
| "next_state_id": next_state_id, |
| "done": int(done) |
| } |
|
|
| |
| |
| placeholder_tensor = torch.tensor([1.0]) |
| exp_record_id = self.tensor_storage.insert(self.experience_dataset, placeholder_tensor, experience_metadata) |
| logger.debug(f"Stored experience record {exp_record_id}: state_id={state_id}, action={action}, reward={reward:.2f}, next_state_id={next_state_id}, done={done}") |
|
|
|
|
| def optimize_model(self) -> None: |
| """Performs one step of optimization on the policy network.""" |
| |
| try: |
| |
| |
| experience_count = len(self.tensor_storage.get_dataset(self.experience_dataset)) |
| except ValueError: |
| experience_count = 0 |
|
|
| if experience_count < self.batch_size: |
| logger.debug(f"Not enough experiences ({experience_count}/{self.batch_size}) to optimize yet.") |
| return |
|
|
| |
| try: |
| sampled_metadata_records = self.tensor_storage.sample_dataset(self.experience_dataset, self.batch_size) |
| except ValueError: |
| logger.error(f"Could not sample from dataset {self.experience_dataset}") |
| return |
| except Exception as e: |
| logger.error(f"Error sampling experiences: {e}", exc_info=True) |
| return |
|
|
| |
| states = [] |
| actions = [] |
| rewards = [] |
| next_states = [] |
| dones = [] |
| non_final_mask_list = [] |
|
|
| for record in sampled_metadata_records: |
| meta = record['metadata'] |
| state_id = meta.get('state_id') |
| next_state_id = meta.get('next_state_id') |
| action = meta.get('action') |
| reward = meta.get('reward') |
| done_flag = bool(meta.get('done', 1)) |
|
|
| |
| if state_id is None or action is None or reward is None: |
| logger.warning(f"Skipping invalid sampled record: {meta}") |
| continue |
|
|
| |
| state_record = self.tensor_storage.get_tensor_by_id(self.state_dataset, state_id) |
| if state_record is None: |
| logger.warning(f"Could not find state tensor with ID {state_id} for experience {meta.get('record_id')}. Skipping sample.") |
| continue |
| states.append(state_record['tensor']) |
|
|
| |
| current_next_state = None |
| if not done_flag and next_state_id: |
| next_state_record = self.tensor_storage.get_tensor_by_id(self.state_dataset, next_state_id) |
| if next_state_record: |
| current_next_state = next_state_record['tensor'] |
| else: |
| |
| logger.warning(f"Could not find next_state tensor with ID {next_state_id} for non-terminal experience {meta.get('record_id')}. Treating as terminal.") |
| done_flag = True |
|
|
| next_states.append(current_next_state) |
| non_final_mask_list.append(not done_flag) |
|
|
| actions.append(torch.tensor([[action]], dtype=torch.long)) |
| rewards.append(torch.tensor([reward], dtype=torch.float32)) |
| dones.append(done_flag) |
|
|
|
|
| |
| if not states: |
| logger.warning("No valid samples retrieved after state lookup. Optimization step skipped.") |
| return |
|
|
| |
| |
| non_final_next_states = torch.cat([ns for ns in next_states if ns is not None]).to(self.device) if any(non_final_mask_list) else None |
| state_batch = torch.cat(states).to(self.device) |
| action_batch = torch.cat(actions).to(self.device) |
| reward_batch = torch.cat(rewards).to(self.device) |
| non_final_mask = torch.tensor(non_final_mask_list, dtype=torch.bool, device=self.device) |
|
|
|
|
| |
| |
| |
| if state_batch.ndim == 1: |
| state_batch = state_batch.unsqueeze(-1) |
|
|
| state_action_values = self.policy_net(state_batch).gather(1, action_batch) |
|
|
|
|
| |
| |
| |
| next_state_values = torch.zeros(len(states), device=self.device) |
| if non_final_next_states is not None and non_final_next_states.numel() > 0: |
| with torch.no_grad(): |
| if non_final_next_states.ndim == 1: |
| non_final_next_states = non_final_next_states.unsqueeze(-1) |
| next_state_values[non_final_mask] = self.target_net(non_final_next_states).max(1)[0] |
|
|
|
|
| |
| expected_state_action_values = (next_state_values * self.gamma) + reward_batch |
|
|
| |
| criterion = nn.SmoothL1Loss() |
| loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1)) |
|
|
|
|
| |
| self.optimizer.zero_grad() |
| loss.backward() |
| |
| torch.nn.utils.clip_grad_value_(self.policy_net.parameters(), 100) |
| self.optimizer.step() |
|
|
| logger.debug(f"Optimization step done. Loss: {loss.item():.4f}") |
|
|
|
|
| |
| if self.steps_done % self.target_update_freq == 0: |
| self._update_target_network() |
|
|
|
|
| def _update_target_network(self): |
| """Copies weights from policy_net to target_net.""" |
| logger.info(f"Updating target network at step {self.steps_done}") |
| self.target_net.load_state_dict(self.policy_net.state_dict()) |
|
|
|
|
| def train(self, env: DummyEnv, num_episodes: int): |
| """Runs the training loop for a number of episodes.""" |
| logger.info(f"--- Starting Training for {num_episodes} episodes ---") |
| episode_rewards = [] |
|
|
| for i_episode in range(num_episodes): |
| state = env.reset() |
| |
| |
| |
|
|
| done = False |
| current_episode_reward = 0 |
| steps_in_episode = 0 |
|
|
| while not done: |
| |
| action = self.select_action(state) |
|
|
| |
| next_state, reward, done, _ = env.step(action) |
| current_episode_reward += reward |
| steps_in_episode += 1 |
|
|
| |
| |
| self.store_experience(state, action, reward, next_state if not done else None, done) |
|
|
| |
| state = next_state |
|
|
| |
| self.optimize_model() |
|
|
| |
| if done: |
| break |
|
|
| episode_rewards.append(current_episode_reward) |
| logger.info(f"Episode {i_episode+1}/{num_episodes} finished after {steps_in_episode} steps. Reward: {current_episode_reward:.2f}. Epsilon: {self.epsilon_end + (self.epsilon_start - self.epsilon_end) * math.exp(-1. * self.steps_done / self.epsilon_decay):.3f}") |
|
|
| |
| if (i_episode + 1) % 50 == 0: |
| avg_reward = sum(episode_rewards[-50:]) / len(episode_rewards[-50:]) |
| logger.info(f" Average reward over last 50 episodes: {avg_reward:.2f}") |
|
|
|
|
| logger.info("--- Training Finished ---") |
| return episode_rewards |
|
|
|
|
| |
| if __name__ == "__main__": |
| logger.info("--- Starting RL Agent Example ---") |
|
|
| |
| storage = TensorStorage() |
|
|
| |
| env = DummyEnv(max_steps=100) |
|
|
| |
| agent = RLAgent( |
| tensor_storage=storage, |
| state_dim=env.state_dim, |
| action_dim=env.action_dim, |
| hidden_size=64, |
| lr=5e-4, |
| gamma=0.95, |
| epsilon_start=0.95, |
| epsilon_end=0.05, |
| epsilon_decay=20000, |
| target_update_freq=200, |
| batch_size=64, |
| experience_dataset="dummy_env_experiences", |
| state_dataset="dummy_env_states" |
| ) |
|
|
| |
| num_episodes_to_run = 200 |
| rewards = agent.train(env, num_episodes=num_episodes_to_run) |
|
|
|
|
| |
| print("\n--- Checking TensorStorage contents (Sample) ---") |
| try: |
| exp_count = len(storage.get_dataset(agent.experience_dataset)) |
| state_count = len(storage.get_dataset(agent.state_dataset)) |
| print(f"Found {exp_count} experience records in '{agent.experience_dataset}'.") |
| print(f"Found {state_count} state records in '{agent.state_dataset}'.") |
|
|
| if exp_count > 0: |
| print("\nExample experience record (metadata):") |
| sample_exp = storage.sample_dataset(agent.experience_dataset, 1) |
| if sample_exp: |
| print(sample_exp[0]['metadata']) |
| state_id = sample_exp[0]['metadata'].get('state_id') |
| if state_id: |
| state_rec = storage.get_tensor_by_id(agent.state_dataset, state_id) |
| if state_rec: |
| print(f" -> Corresponding state tensor (retrieved): {state_rec['tensor']}") |
|
|
| except ValueError as e: |
| print(f"Could not retrieve datasets: {e}") |
| except Exception as e: |
| print(f"An error occurred checking storage: {e}") |
|
|
|
|
| logger.info("--- RL Agent Example Finished ---") |
|
|
| |
| try: |
| import matplotlib.pyplot as plt |
| plt.figure(figsize=(10, 5)) |
| plt.plot(rewards) |
| |
| moving_avg = [sum(rewards[max(0, i-20):i+1])/len(rewards[max(0, i-20):i+1]) for i in range(len(rewards))] |
| plt.plot(moving_avg, linestyle='--', label='Moving Avg (20 episodes)') |
| plt.title("Episode Rewards over Time") |
| plt.xlabel("Episode") |
| plt.ylabel("Total Reward") |
| plt.grid(True) |
| plt.legend() |
| print("\nPlotting rewards... Close the plot window to exit.") |
| plt.show() |
|
|
| except ImportError: |
| print("\nMatplotlib not found. Skipping reward plot. Install with: pip install matplotlib") |