| |
| import torch |
| import numpy as np |
| import matplotlib.pyplot as plt |
|
|
|
|
| class TrajectoryGenerator(object): |
| def __init__(self, options, place_cells): |
| self.options = options |
| self.place_cells = place_cells |
|
|
| def plot_trajectory(self, traj, box_width, box_height, idx=0, step=2): |
| """ |
| Visualize one trajectory from traj dict. |
| |
| Args: |
| traj: dictionary containing trajectory info |
| box_width, box_height: dimensions of the environment |
| idx: which trajectory to plot from the batch (default: 0) |
| step: plot an arrow every 'step' frames |
| """ |
| |
| x = traj["target_x"][idx] |
| y = traj["target_y"][idx] |
| hd = traj["target_hd"][idx] |
|
|
| |
| x0 = traj["init_x"][idx, 0] |
| y0 = traj["init_y"][idx, 0] |
| hd0 = traj["init_hd"][idx, 0] |
|
|
| x = np.concatenate([[x0], x]) |
| y = np.concatenate([[y0], y]) |
| hd = np.concatenate([[hd0], hd]) |
|
|
| |
| plt.figure(figsize=(6, 6)) |
| plt.plot(x, y, "-o", markersize=2, label="trajectory") |
|
|
| |
| for t in range(0, len(x), step): |
| dx = 0.1 * np.cos(hd[t]) |
| dy = 0.1 * np.sin(hd[t]) |
| plt.arrow( |
| x[t], y[t], dx, dy, head_width=0.05, head_length=0.08, fc="r", ec="r" |
| ) |
|
|
| |
| plt.axhline(y=-box_height / 2, color="k") |
| plt.axhline(y=box_height / 2, color="k") |
| plt.axvline(x=-box_width / 2, color="k") |
| plt.axvline(x=box_width / 2, color="k") |
|
|
| plt.xlim([-box_width / 2 - 0.2, box_width / 2 + 0.2]) |
| plt.ylim([-box_height / 2 - 0.2, box_height / 2 + 0.2]) |
| plt.gca().set_aspect("equal", adjustable="box") |
| plt.xlabel("x position (m)") |
| plt.ylabel("y position (m)") |
| plt.title(f"Trajectory {idx}") |
| plt.legend() |
| plt.show() |
|
|
| def avoid_wall(self, position, hd, box_width, box_height): |
| """ |
| Compute distance and angle to nearest wall |
| """ |
| x = position[:, 0] |
| y = position[:, 1] |
| dists = [ |
| box_width / 2 - x, |
| box_height / 2 - y, |
| box_width / 2 + x, |
| box_height / 2 + y, |
| ] |
| d_wall = np.min(dists, axis=0) |
| angles = np.arange(4) * np.pi / 2 |
| theta = angles[np.argmin(dists, axis=0)] |
| hd = np.mod(hd, 2 * np.pi) |
| a_wall = hd - theta |
| a_wall = np.mod(a_wall + np.pi, 2 * np.pi) - np.pi |
|
|
| is_near_wall = (d_wall < self.border_region) * (np.abs(a_wall) < np.pi / 2) |
| turn_angle = np.zeros_like(hd) |
| turn_angle[is_near_wall] = np.sign(a_wall[is_near_wall]) * ( |
| np.pi / 2 - np.abs(a_wall[is_near_wall]) |
| ) |
|
|
| return is_near_wall, turn_angle |
|
|
| def generate_trajectory(self, box_width, box_height, batch_size): |
| """Generate a random walk in a rectangular box""" |
| samples = self.options.sequence_length |
| dt = 0.02 |
| sigma = 5.76 * 2 |
| b = 0.13 * 2 * np.pi |
| mu = 0 |
| self.border_region = 0.03 |
|
|
| |
| position = np.zeros([batch_size, samples + 2, 2]) |
| head_dir = np.zeros([batch_size, samples + 2]) |
| position[:, 0, 0] = np.random.uniform(-box_width / 2, box_width / 2, batch_size) |
| position[:, 0, 1] = np.random.uniform( |
| -box_height / 2, box_height / 2, batch_size |
| ) |
| head_dir[:, 0] = np.random.uniform(0, 2 * np.pi, batch_size) |
| velocity = np.zeros([batch_size, samples + 2]) |
|
|
| |
| random_turn = np.random.normal(mu, sigma, [batch_size, samples + 1]) |
| random_vel = np.random.rayleigh(b, [batch_size, samples + 1]) |
| v = np.abs(np.random.normal(0, b * np.pi / 2, batch_size)) |
|
|
| for t in range(samples + 1): |
| |
| v = random_vel[:, t] |
| turn_angle = np.zeros(batch_size) |
|
|
| if not self.options.periodic: |
| |
| is_near_wall, turn_angle = self.avoid_wall( |
| position[:, t], head_dir[:, t], box_width, box_height |
| ) |
| v[is_near_wall] *= 0.25 |
|
|
| |
| turn_angle += dt * random_turn[:, t] |
|
|
| |
| velocity[:, t] = v * dt |
| update = velocity[:, t, None] * np.stack( |
| [np.cos(head_dir[:, t]), np.sin(head_dir[:, t])], axis=-1 |
| ) |
| position[:, t + 1] = position[:, t] + update |
|
|
| |
| head_dir[:, t + 1] = head_dir[:, t] + turn_angle |
|
|
| |
| if self.options.periodic: |
| position[:, :, 0] = ( |
| np.mod(position[:, :, 0] + box_width / 2, box_width) - box_width / 2 |
| ) |
| position[:, :, 1] = ( |
| np.mod(position[:, :, 1] + box_height / 2, box_height) - box_height / 2 |
| ) |
|
|
| head_dir = np.mod(head_dir + np.pi, 2 * np.pi) - np.pi |
|
|
| traj = {} |
| |
| traj["init_hd"] = head_dir[:, 0, None] |
| traj["init_x"] = position[:, 1, 0, None] |
| traj["init_y"] = position[:, 1, 1, None] |
|
|
| traj["ego_v"] = velocity[:, 1:-1] |
| ang_v = np.diff(head_dir, axis=-1) |
| traj["phi_x"], traj["phi_y"] = np.cos(ang_v)[:, :-1], np.sin(ang_v)[:, :-1] |
|
|
| |
| traj["target_hd"] = head_dir[:, 1:-1] |
| traj["target_x"] = position[:, 2:, 0] |
| traj["target_y"] = position[:, 2:, 1] |
|
|
| |
| |
| |
|
|
| return traj |
|
|
| def get_generator(self, batch_size=None, box_width=None, box_height=None): |
| """ |
| Returns a generator that yields batches of trajectories |
| """ |
| if not batch_size: |
| batch_size = self.options.batch_size |
| if not box_width: |
| box_width = self.options.box_width |
| if not box_height: |
| box_height = self.options.box_height |
|
|
| while True: |
| traj = self.generate_trajectory(box_width, box_height, batch_size) |
|
|
| v = np.stack( |
| [ |
| traj["ego_v"] * np.cos(traj["target_hd"]), |
| traj["ego_v"] * np.sin(traj["target_hd"]), |
| ], |
| axis=-1, |
| ) |
| v = torch.tensor(v, dtype=torch.float32).transpose(0, 1) |
|
|
| pos = np.stack([traj["target_x"], traj["target_y"]], axis=-1) |
| pos = torch.tensor(pos, dtype=torch.float32).transpose(0, 1) |
| |
| pos = pos.to(self.options.device) |
| place_outputs = self.place_cells.get_activation(pos) |
|
|
| init_pos = np.stack([traj["init_x"], traj["init_y"]], axis=-1) |
| init_pos = torch.tensor(init_pos, dtype=torch.float32) |
| init_pos = init_pos.to(self.options.device) |
| init_actv = self.place_cells.get_activation(init_pos).squeeze() |
|
|
| v = v.to(self.options.device) |
| inputs = (v, init_actv) |
|
|
| yield (inputs, place_outputs, pos) |
|
|
| def get_test_batch(self, batch_size=None, box_width=None, box_height=None): |
| """For testing performance, returns a batch of smample trajectories""" |
| if not batch_size: |
| batch_size = self.options.batch_size |
| if not box_width: |
| box_width = self.options.box_width |
| if not box_height: |
| box_height = self.options.box_height |
|
|
| traj = self.generate_trajectory(box_width, box_height, batch_size) |
|
|
| v = np.stack( |
| [ |
| traj["ego_v"] * np.cos(traj["target_hd"]), |
| traj["ego_v"] * np.sin(traj["target_hd"]), |
| ], |
| axis=-1, |
| ) |
| v = torch.tensor(v, dtype=torch.float32).transpose(0, 1) |
|
|
| pos = np.stack([traj["target_x"], traj["target_y"]], axis=-1) |
| pos = torch.tensor(pos, dtype=torch.float32).transpose(0, 1) |
| pos = pos.to(self.options.device) |
| place_outputs = self.place_cells.get_activation(pos) |
|
|
| init_pos = np.stack([traj["init_x"], traj["init_y"]], axis=-1) |
| init_pos = torch.tensor(init_pos, dtype=torch.float32) |
| init_pos = init_pos.to(self.options.device) |
| init_actv = self.place_cells.get_activation(init_pos).squeeze() |
|
|
| v = v.to(self.options.device) |
| inputs = (v, init_actv) |
|
|
| return (inputs, pos, place_outputs) |
|
|