| import numpy as np |
| from dataclasses import dataclass |
|
|
| import param_ |
| from drone import Drone |
| from dronemodel import DroneModel |
| from settings import Settings |
|
|
|
|
| @dataclass |
| class Team: |
| """ |
| Creates a team (it is either red or blue / foe or friend |
| """ |
|
|
| is_blue: bool |
| drones: [Drone] |
| drone_model: DroneModel |
| weighted_distance: float = 0 |
|
|
| def reset(self, obs=None): |
|
|
| self.delta_weighted_distance() |
| if obs: |
| for drone, obs in zip(self.drones, obs): |
| drone.reset(obs=obs) |
| else: |
| for drone in self.drones: |
| drone.reset() |
|
|
| def get_observation(self) -> np.ndarray: |
| """ |
| get the observation for the RL agent |
| :return: observation in the form of flatten np.arrays of shape(squad_number, 6*squad_size) |
| """ |
| obs = np.array([drone.get_observation() for drone in self.drones]) |
| deads = ~np.array([drone.is_alive for drone in self.drones]) |
|
|
| return obs, deads |
|
|
| def step(self, action: np.ndarray): |
| obs = np.zeros((len(self.drones), 6)) |
| done = np.zeros((len(self.drones),)) |
| reward = np.zeros((len(self.drones),)) |
| infos = [{} for d in range(len(self.drones))] |
| for index, drone in enumerate(self.drones): |
| obs[index], reward[index], done[index], infos[index] = drone.step(action[index]) |
| done = (sum(done) == len(self.drones)) |
| info = {'oob': 0, 'hits_target': 0, 'ttl': param_.DURATION, 'distance_to_straight_action': 0} |
| for i in infos: |
| info['ttl'] = min(info['ttl'], i['ttl']) |
| info['oob'] += i['oob'] if 'oob' in i else 0 |
| info['hits_target'] += i['hits_target'] if 'hits_target' in i else 0 |
| info['delta_distance'] = 0 if self.is_blue else self.delta_weighted_distance() |
| info['distance_to_straight_action'] += i['distance_to_straight_action'] \ |
| if 'distance_to_straight_action' in i else 0 |
| return obs, sum(reward), done, info |
|
|
| def delta_weighted_distance(self): |
|
|
| |
| team_distance = np.array([d.distance() for d in self.drones if d.is_alive]) |
| weighted_distance = np.sum(np.exp(-0.5 * (team_distance / (Settings.perimeter/2)) ** 2)) |
|
|
| delta = weighted_distance - self.weighted_distance if 0 < self.weighted_distance else 0 |
|
|
| self.weighted_distance = weighted_distance |
|
|
| return delta |
|
|
|
|
| class BlueTeam(Team): |
| """ |
| Creates the blue team |
| """ |
|
|
| def __init__(self, number_of_drones: int = Settings.blues): |
| self.is_blue = True |
| self.drone_model = DroneModel(self.is_blue) |
|
|
| |
| positions = np.zeros((number_of_drones, 3)) |
| speeds = np.zeros((number_of_drones, 3)) |
| blue_speed = Settings.blue_speed_init * self.drone_model.max_speed |
| circle = index = 0 |
| for d in range(number_of_drones): |
| positions[d] = np.array([Settings.blue_circles_rho[circle], |
| Settings.blue_circles_theta[circle] + index * 2 * np.pi / 3, |
| Settings.blue_circles_zed[circle]]) |
| clockwise = 1 - 2 * (circle % 2) |
| speeds[d] = np.array([blue_speed, np.pi / 6 * clockwise, 0]) |
| index += 1 |
| if index == Settings.blues_per_circle[circle]: |
| index = 0 |
| circle += 1 |
|
|
| self.drones = [Drone(is_blue=True, position=position, speed=speed, id_=id_) |
| for (id_, position, speed) in zip(range(number_of_drones), positions, speeds)] |
|
|
|
|
| class RedTeam(Team): |
| """ |
| Creates the red team |
| """ |
|
|
| def __init__(self, number_of_drones: int = Settings.reds): |
| self.is_blue = False |
| self.drone_model = DroneModel(self.is_blue) |
|
|
| positions = np.zeros((number_of_drones, 3)) |
| positions_noise = np.zeros((number_of_drones, 3)) |
| speeds = np.zeros((number_of_drones, 3)) |
| speed_rho = Settings.red_speed_init * self.drone_model.max_speed |
| squad = index = 0 |
| for d in range(number_of_drones): |
| positions[d] = [Settings.red_squads_rho[squad], |
| Settings.red_squads_theta[squad], |
| Settings.red_squads_zed[squad]] |
| positions_noise[d] = [Settings.red_rho_noise[squad], |
| Settings.red_theta_noise[squad], |
| Settings.red_zed_noise[squad]] |
| speeds[d] = [speed_rho, np.pi + positions[d][1], 0] |
| speeds[d] = [speed_rho, np.pi + positions[d][1], 0] |
| index += 1 |
| if index == Settings.red_squads[squad]: |
| index = 0 |
| squad += 1 |
|
|
| self.drones = [Drone(is_blue=False, position=position, position_noise=position_noise, speed=speed, id_=id_) |
| for (id_, position, position_noise, speed) in |
| zip(range(len(positions)), positions, positions_noise, speeds)] |
|
|