Spaces:
Running
on
Zero
Running
on
Zero
| ####################################################################### | |
| # Name: worker.py | |
| # | |
| # - Runs robot in environment for N steps | |
| # - Collects & Returns S(t), A(t), R(t), S(t+1) | |
| ####################################################################### | |
| from .parameter import * | |
| import os | |
| import json | |
| import copy | |
| import imageio | |
| import numpy as np | |
| import torch | |
| from time import time | |
| from .env import Env | |
| from .robot import Robot | |
| class Worker: | |
| def __init__(self, meta_agent_id, n_agent, policy_net, q_net, global_step, device='cuda', greedy=False, save_image=False, clip_seg_tta=None): | |
| self.device = device | |
| self.greedy = greedy | |
| self.n_agent = n_agent | |
| self.metaAgentID = meta_agent_id | |
| self.global_step = global_step | |
| self.node_padding_size = NODE_PADDING_SIZE | |
| self.k_size = K_SIZE | |
| self.save_image = save_image | |
| self.clip_seg_tta = clip_seg_tta | |
| # Randomize map_index | |
| mask_index = None | |
| if MASKS_RAND_INDICES_PATH != "": | |
| with open(MASKS_RAND_INDICES_PATH, 'r') as f: | |
| mask_index_rand_json = json.load(f) | |
| mask_index = mask_index_rand_json[self.global_step % len(mask_index_rand_json)] | |
| print("mask_index: ", mask_index) | |
| self.env = Env(map_index=self.global_step, n_agent=n_agent, k_size=self.k_size, plot=save_image, mask_index=mask_index) | |
| self.local_policy_net = policy_net | |
| self.local_q_net = q_net | |
| self.robot_list = [] | |
| self.all_robot_positions = [] | |
| for i in range(self.n_agent): | |
| robot_position = self.env.start_positions[i] | |
| robot = Robot(robot_id=i, position=robot_position, plot=save_image) | |
| self.robot_list.append(robot) | |
| self.all_robot_positions.append(robot_position) | |
| self.perf_metrics = dict() | |
| self.episode_buffer = [] | |
| for i in range(15): | |
| self.episode_buffer.append([]) | |
| def run_episode(self, curr_episode): | |
| eps_start = time() | |
| done = False | |
| for robot_id, deciding_robot in enumerate(self.robot_list): | |
| deciding_robot.observations = self.get_observations(deciding_robot.robot_position) | |
| ### Run episode ### | |
| for step in range(NUM_EPS_STEPS): | |
| next_position_list = [] | |
| dist_list = [] | |
| travel_dist_list = [] | |
| dist_array = np.zeros((self.n_agent, 1)) | |
| for robot_id, deciding_robot in enumerate(self.robot_list): | |
| observations = deciding_robot.observations | |
| deciding_robot.save_observations(observations) | |
| ### Forward pass through policy to get next position ### | |
| next_position, action_index = self.select_node(observations) | |
| deciding_robot.save_action(action_index) | |
| dist = np.linalg.norm(next_position - deciding_robot.robot_position) | |
| ### Log results of action (e.g. distance travelled) ### | |
| dist_array[robot_id] = dist | |
| dist_list.append(dist) | |
| travel_dist_list.append(deciding_robot.travel_dist) | |
| next_position_list.append(next_position) | |
| self.all_robot_positions[robot_id] = next_position | |
| arriving_sequence = np.argsort(dist_list) | |
| next_position_list = np.array(next_position_list) | |
| dist_list = np.array(dist_list) | |
| travel_dist_list = np.array(travel_dist_list) | |
| next_position_list = next_position_list[arriving_sequence] | |
| dist_list = dist_list[arriving_sequence] | |
| travel_dist_list = travel_dist_list[arriving_sequence] | |
| ### Take Action (Deconflict if 2 agents choose the same target position) ### | |
| next_position_list, dist_list = self.solve_conflict(arriving_sequence, next_position_list, dist_list) | |
| reward_list, done = self.env.multi_robot_step(next_position_list, dist_list, travel_dist_list) | |
| ### Update observations + rewards from action ### | |
| for reward, robot_id in zip(reward_list, arriving_sequence): | |
| robot = self.robot_list[robot_id] | |
| robot.observations = self.get_observations(robot.robot_position) | |
| robot.save_trajectory_coords(self.env.find_index_from_coords(robot.robot_position), self.env.num_new_targets_found) | |
| robot.save_reward_done(reward, done) | |
| robot.save_next_observations(robot.observations) | |
| ### Save a frame to generate gif of robot trajectories ### | |
| if self.save_image: | |
| robots_route = [] | |
| for robot in self.robot_list: | |
| robots_route.append([robot.xPoints, robot.yPoints]) | |
| if not os.path.exists(GIFS_PATH): | |
| os.makedirs(GIFS_PATH) | |
| self.env.plot_env(self.global_step, GIFS_PATH, step, max(travel_dist_list), robots_route) | |
| if done: | |
| break | |
| for robot in self.robot_list: | |
| for i in range(15): | |
| self.episode_buffer[i] += robot.episode_buffer[i] | |
| self.perf_metrics['travel_dist'] = max(travel_dist_list) | |
| self.perf_metrics['explored_rate'] = self.env.explored_rate | |
| self.perf_metrics['targets_found'] = self.env.targets_found_rate | |
| self.perf_metrics['success_rate'] = done | |
| # save gif | |
| if self.save_image: | |
| path = GIFS_PATH | |
| self.make_gif(path, curr_episode) | |
| print(YELLOW, f"[Eps {curr_episode} Completed] Time Taken: {time()-eps_start:.2f}s, Steps: {step+1}", NC) | |
| def get_observations(self, robot_position): | |
| """ Get robot's sensor observation of environment given position """ | |
| current_node_index = self.env.find_index_from_coords(robot_position) | |
| current_index = torch.tensor([current_node_index]).unsqueeze(0).unsqueeze(0).to(self.device) # (1,1,1) | |
| node_coords = copy.deepcopy(self.env.node_coords) | |
| graph = copy.deepcopy(self.env.graph) | |
| node_utility = copy.deepcopy(self.env.node_utility) | |
| guidepost = copy.deepcopy(self.env.guidepost) | |
| segmentation_info_mask = copy.deepcopy(self.env.filtered_seg_info_mask) | |
| n_nodes = node_coords.shape[0] | |
| node_coords = node_coords / 640 | |
| node_utility = node_utility / 50 | |
| node_utility_inputs = node_utility.reshape((n_nodes, 1)) | |
| occupied_node = np.zeros((n_nodes, 1)) | |
| for position in self.all_robot_positions: | |
| index = self.env.find_index_from_coords(position) | |
| if index == current_index.item(): | |
| occupied_node[index] = -1 | |
| else: | |
| occupied_node[index] = 1 | |
| node_inputs = np.concatenate((node_coords, segmentation_info_mask, guidepost), axis=1) | |
| node_inputs = torch.FloatTensor(node_inputs).unsqueeze(0).to(self.device) # (1, node_padding_size+1, 3) | |
| assert node_coords.shape[0] < self.node_padding_size | |
| padding = torch.nn.ZeroPad2d((0, 0, 0, self.node_padding_size - node_coords.shape[0])) | |
| node_inputs = padding(node_inputs) | |
| node_padding_mask = torch.zeros((1, 1, node_coords.shape[0]), dtype=torch.int64).to(self.device) | |
| node_padding = torch.ones((1, 1, self.node_padding_size - node_coords.shape[0]), dtype=torch.int64).to( | |
| self.device) | |
| node_padding_mask = torch.cat((node_padding_mask, node_padding), dim=-1) | |
| graph = list(graph.values()) | |
| edge_inputs = [] | |
| for node in graph: | |
| node_edges = list(map(int, node)) | |
| edge_inputs.append(node_edges) | |
| bias_matrix = self.calculate_edge_mask(edge_inputs) | |
| edge_mask = torch.from_numpy(bias_matrix).float().unsqueeze(0).to(self.device) | |
| assert len(edge_inputs) < self.node_padding_size | |
| padding = torch.nn.ConstantPad2d( | |
| (0, self.node_padding_size - len(edge_inputs), 0, self.node_padding_size - len(edge_inputs)), 1) | |
| edge_mask = padding(edge_mask) | |
| padding2 = torch.nn.ZeroPad2d((0, 0, 0, self.node_padding_size - len(edge_inputs))) | |
| for edges in edge_inputs: | |
| while len(edges) < self.k_size: | |
| edges.append(0) | |
| edge_inputs = torch.tensor(edge_inputs).unsqueeze(0).to(self.device) # (1, node_padding_size+1, k_size) | |
| edge_inputs = padding2(edge_inputs) | |
| edge_padding_mask = torch.zeros((1, len(edge_inputs), K_SIZE), dtype=torch.int64).to(self.device) | |
| one = torch.ones_like(edge_padding_mask, dtype=torch.int64).to(self.device) | |
| edge_padding_mask = torch.where(edge_inputs == 0, one, edge_padding_mask) | |
| observations = node_inputs, edge_inputs, current_index, node_padding_mask, edge_padding_mask, edge_mask | |
| return observations | |
| def select_node(self, observations): | |
| """ Forward pass through policy to get next position to go to on map """ | |
| node_inputs, edge_inputs, current_index, node_padding_mask, edge_padding_mask, edge_mask = observations | |
| with torch.no_grad(): | |
| logp_list = self.local_policy_net(node_inputs, edge_inputs, current_index, node_padding_mask, | |
| edge_padding_mask, edge_mask) | |
| if self.greedy: | |
| action_index = torch.argmax(logp_list, dim=1).long() | |
| else: | |
| action_index = torch.multinomial(logp_list.exp(), 1).long().squeeze(1) | |
| next_node_index = edge_inputs[:, current_index.item(), action_index.item()] | |
| next_position = self.env.node_coords[next_node_index] | |
| return next_position, action_index | |
| def solve_conflict(self, arriving_sequence, next_position_list, dist_list): | |
| """ Deconflict if 2 agents choose the same target position """ | |
| for j, [robot_id, next_position] in enumerate(zip(arriving_sequence, next_position_list)): | |
| moving_robot = self.robot_list[robot_id] | |
| # if next_position[0] + next_position[1] * 1j in (next_position_list[:, 0] + next_position_list[:, 1] * 1j)[:j]: | |
| # dist_to_next_position = np.argsort(np.linalg.norm(self.env.node_coords - next_position, axis=1)) | |
| # k = 0 | |
| # while next_position[0] + next_position[1] * 1j in (next_position_list[:, 0] + next_position_list[:, 1] * 1j)[:j]: | |
| # k += 1 | |
| # next_position = self.env.node_coords[dist_to_next_position[k]] | |
| dist = np.linalg.norm(next_position - moving_robot.robot_position) | |
| next_position_list[j] = next_position | |
| dist_list[j] = dist | |
| moving_robot.travel_dist += dist | |
| moving_robot.robot_position = next_position | |
| return next_position_list, dist_list | |
| def work(self, currEpisode): | |
| ''' | |
| Interacts with the environment. The agent gets either gradients or experience buffer | |
| ''' | |
| self.run_episode(currEpisode) | |
| def calculate_edge_mask(self, edge_inputs): | |
| size = len(edge_inputs) | |
| bias_matrix = np.ones((size, size)) | |
| for i in range(size): | |
| for j in range(size): | |
| if j in edge_inputs[i]: | |
| bias_matrix[i][j] = 0 | |
| return bias_matrix | |
| def make_gif(self, path, n): | |
| """ Generate a gif given list of images """ | |
| with imageio.get_writer('{}/{}_target_rate_{:.2f}.gif'.format(path, n, self.env.targets_found_rate), mode='I', | |
| fps=5) as writer: | |
| for frame in self.env.frame_files: | |
| image = imageio.imread(frame) | |
| writer.append_data(image) | |
| print('gif complete\n') | |
| # Remove files | |
| for filename in self.env.frame_files[:-1]: | |
| os.remove(filename) |