| from make_env import GridWorldEnv
|
| import matplotlib.pyplot as plt
|
| import numpy as np
|
| import itertools
|
| import random
|
| import concurrent.futures
|
|
|
|
|
| class Algorithm_Agent():
|
| def __init__(self, num_categories, grid_size, grid, probs, loc):
|
| self.num_categories = num_categories
|
| self.grid_size = grid_size
|
| self.grid = grid
|
| self.probs = probs
|
| self.loc = loc
|
| self.current_loc = [loc[0], loc[1]]
|
| self.path, self.path_category = self.arrange_points()
|
| self.actions = self.plan_action()
|
|
|
| def calculate_length(self, paths, elim_paths, prob_paths):
|
|
|
| lengths = np.sum(np.abs(np.array(paths[:, :-1]) - np.array(paths[:, 1:])), axis=-1) +1
|
| motion_length = np.sum(lengths, axis=-1) + np.sum(np.abs(self.loc - paths[:, 0]), axis=-1) +1
|
| cum_lengths = np.flip(np.cumsum(np.flip(lengths), axis=-1)) / 14.4
|
|
|
| load_length = np.sum(cum_lengths, axis=-1) - 4 * np.sum(np.array(cum_lengths) * np.array(elim_paths[:, :-1]), axis=-1)
|
| prob_length = np.sum(np.arange(len(prob_paths[-1])) * prob_paths)
|
|
|
| return motion_length + load_length + 0.0 * prob_length
|
|
|
|
|
| def get_elim_path(self, category_paths):
|
|
|
| elim_path = np.zeros_like(category_paths)
|
| for i in range(category_paths.shape[1]):
|
| if i > 0:
|
| previous_caterogy_path = category_paths[:, :i]
|
|
|
| same_category_count = np.sum(previous_caterogy_path == category_paths[:, i:i+1], axis=-1)
|
| elim_path[:, i] = (same_category_count + 1) % 4 == 0
|
|
|
| return elim_path
|
|
|
|
|
| def find_shortest_path(self, points):
|
| min_path = None
|
| min_length = float('inf')
|
| for perm in itertools.permutations(points):
|
| length = sum(np.sum(np.abs(np.array(perm[i]) - np.array(perm[i + 1]))) for i in range(len(perm) - 1))
|
| if length < min_length:
|
| min_length = length
|
| min_path = list(perm)
|
| return min_path, min_length
|
|
|
| def insert_point(self, path, category_path, prob_path, point, category, prob):
|
| min_length = float('inf')
|
| best_position = range(len(path) + 1)
|
|
|
| new_path = np.zeros((len(best_position), len(path) + 1, 2))
|
| new_category_path = np.zeros((len(best_position), len(path) + 1))
|
| new_prob_path = np.zeros((len(best_position), len(path) + 1))
|
| for i in range(len(best_position)):
|
| new_path[i] = np.insert(path, best_position[i], point, axis=0)
|
| new_category_path[i] = np.insert(category_path, best_position[i], category, axis=0)
|
| new_prob_path[i] = np.insert(prob_path, best_position[i], prob, axis=0)
|
| new_elim_path = self.get_elim_path(new_category_path)
|
|
|
| lengths = self.calculate_length(new_path, new_elim_path, new_prob_path)
|
| min_length = np.min(lengths)
|
| best_position = np.argmin(lengths)
|
|
|
| return best_position, min_length
|
|
|
| def arrange_points(self):
|
| points_by_category = {i: [] for i in random.sample(range(self.num_categories), self.num_categories)}
|
| for x in range(self.grid_size[0]):
|
| for y in range(self.grid_size[1]):
|
| category = self.grid[x, y]
|
| if category != -1:
|
| points_by_category[category].append([x, y])
|
|
|
| path, category_path, prob_path, rewards_his = [], [], [], []
|
| for category, points in points_by_category.items():
|
| while points:
|
| if len(points) >= 4:
|
| subset = points[:4]
|
| points = points[4:]
|
| else:
|
| subset = points
|
| points = []
|
| if len(path) == 0:
|
| path, _ = self.find_shortest_path(subset)
|
| category_path = [category] * len(path)
|
| prob_path = [self.probs[point[0], point[1]] for point in path]
|
| else:
|
| for point in subset:
|
| position, length = self.insert_point(path, category_path, prob_path, point, category, self.probs[point[0], point[1]])
|
| path.insert(position, point)
|
| category_path.insert(position, category)
|
| prob_path.insert(position, self.probs[point[0], point[1]])
|
|
|
|
|
|
|
| for i in range(1000):
|
| index = np.random.randint(0, 144)
|
| point = path.pop(index)
|
| category = category_path.pop(index)
|
| prob = prob_path.pop(index)
|
| position, length = self.insert_point(path, category_path, prob_path, point, category, prob)
|
| path.insert(position, point)
|
| category_path.insert(position, category)
|
| prob_path.insert(position, prob)
|
| rewards_his.append(100 + 36 - length / 10)
|
| self.cumulated_reward = rewards_his[-1]
|
|
|
|
|
| return path, category_path
|
|
|
| def plan_action(self):
|
| actions = []
|
| for i in range(len(self.path)):
|
| while self.current_loc[0] != self.path[i][0] or self.current_loc[1] != self.path[i][1]:
|
| if self.current_loc[0] < self.path[i][0]:
|
| actions.append(0)
|
| self.current_loc = [self.current_loc[0] + 1, self.current_loc[1]]
|
| elif self.current_loc[1] < self.path[i][1]:
|
| actions.append(1)
|
| self.current_loc = [self.current_loc[0], self.current_loc[1] + 1]
|
| elif self.current_loc[0] > self.path[i][0]:
|
| actions.append(2)
|
| self.current_loc = [self.current_loc[0] - 1, self.current_loc[1]]
|
| else:
|
| actions.append(3)
|
| self.current_loc = [self.current_loc[0], self.current_loc[1] - 1]
|
| actions.append(4)
|
|
|
| return actions
|
|
|
| def adjust_grid(predictions, openmax_probs):
|
|
|
|
|
| class_counts = np.bincount(predictions, minlength=21)
|
|
|
|
|
| for category in range(20):
|
| if class_counts[category] % 4 == 1:
|
|
|
| category_indices = np.where(predictions == category)[0]
|
| if len(category_indices) == 0:
|
| continue
|
|
|
|
|
| category_probs = openmax_probs[category_indices, category]
|
| worst_idx = category_indices[np.argmin(category_probs)]
|
|
|
|
|
| other_probs = openmax_probs[worst_idx]
|
| other_probs[category] = -1
|
| new_category = np.argmax(other_probs)
|
|
|
|
|
| class_counts[category] -= 1
|
| class_counts[new_category] += 1
|
|
|
| predictions[worst_idx] = new_category
|
|
|
|
|
| for category in range(20):
|
| if class_counts[category] % 4 == 2:
|
|
|
| for j in range(2):
|
| other_indices = np.where(predictions != category)[0]
|
| if len(other_indices) == 0:
|
| continue
|
|
|
|
|
| category_probs = openmax_probs[other_indices, category]
|
| best_idx = other_indices[np.argmax(category_probs)]
|
|
|
|
|
| class_counts[predictions[best_idx]] -= 1
|
| class_counts[category] += 1
|
|
|
| predictions[best_idx] = category
|
|
|
|
|
| for category in range(20):
|
| if class_counts[category] % 4 == 3:
|
|
|
| other_indices = np.where(predictions != category)[0]
|
| if len(other_indices) == 0:
|
| continue
|
|
|
|
|
| category_probs = openmax_probs[other_indices, category]
|
| best_idx = other_indices[np.argmax(category_probs)]
|
|
|
|
|
| class_counts[predictions[best_idx]] -= 1
|
| class_counts[category] += 1
|
|
|
| predictions[best_idx] = category
|
|
|
| probs = openmax_probs[np.arange(144), predictions]
|
|
|
| return predictions.reshape(12, 12), probs.reshape(12, 12)
|
|
|
| def search_once(grid, probs, loc):
|
| agent = Algorithm_Agent(21, (12, 12), grid, probs, loc)
|
| return agent.actions, agent.cumulated_reward
|
|
|
|
|
| def search(grid, probs, loc, num_iterations=60):
|
| with concurrent.futures.ProcessPoolExecutor(max_workers=num_iterations) as executor:
|
| futures = [executor.submit(search_once, grid.copy(), probs.copy(), loc.copy()) for _ in range(num_iterations)]
|
| results = [future.result() for future in concurrent.futures.as_completed(futures)]
|
|
|
|
|
|
|
|
|
|
|
| optim_actions, optim_reward = max(results, key=lambda x: x[1])
|
|
|
|
|
| env = GridWorldEnv()
|
| cumulated_reward = 0
|
| env.reset()
|
| env.grid, env.loc = grid.copy(), loc.copy()
|
| for action in optim_actions:
|
| obs, reward, done, truncated, info = env.step(action)
|
| cumulated_reward += reward
|
| print(f'Final reward: {cumulated_reward}')
|
|
|
| return optim_actions
|
|
|
|
|
| if __name__ == "__main__":
|
| for _ in range(1):
|
| test_env = GridWorldEnv()
|
| test_env.reset()
|
| grid, loc = test_env.grid.copy(), test_env.loc.copy()
|
| pred_grid, pred_loc = test_env.grid.copy(), test_env.loc.copy()
|
| loc_1, loc_2, loc_3, loc_4, loc_5 = random.sample(range(12), 2), random.sample(range(12), 2), random.sample(range(12), 2), random.sample(range(12), 2), random.sample(range(12), 2)
|
| a, b, c, d, e = pred_grid[loc_1[0], loc_1[1]], pred_grid[loc_2[0], loc_2[1]], pred_grid[loc_3[0], loc_3[1]], pred_grid[loc_4[0], loc_4[1]], pred_grid[loc_5[0], loc_5[1]]
|
| pred_grid[loc_1[0], loc_1[1]], pred_grid[loc_2[0], loc_2[1]], pred_grid[loc_3[0], loc_3[1]], pred_grid[loc_4[0], loc_4[1]], pred_grid[loc_5[0], loc_5[1]] = b, e, a, c, d
|
| search(grid, loc, grid, loc)
|
|
|
|
|