Spaces:
Paused
Paused
| import math | |
| import random | |
| import numpy as np | |
| from trajdata.maps import VectorMap | |
| from submodules.Pplan.Sampling.spline_planner import SplinePlanner | |
| import torch | |
| import time | |
| import math | |
| from copy import deepcopy | |
| from utils.dynamic_utils import unicycle | |
| def constant_tracking(state, path, dt): | |
| ''' | |
| Args: | |
| state: current state of the vehicle, of size [x, y, yaw, speed] | |
| path: the path to follow, of size (N, [x, y, yaw]) | |
| dt: time duration | |
| ''' | |
| # find the nearest point in the path | |
| dists = torch.norm(path[:, :2] - state[None, :2], dim=1) | |
| nearest_index = torch.argmin(dists) | |
| # find the target point | |
| lookahead_distance = state[3] * dt | |
| target = path[-1] | |
| is_end = True | |
| for i in range(nearest_index + 1, len(path)): | |
| if torch.norm(path[i, :2] - state[:2]) > lookahead_distance: | |
| target = path[i] | |
| is_end = False | |
| break | |
| # compute the new state | |
| target_distance = torch.norm(target[:2] - state[:2]) | |
| ratio = lookahead_distance / target_distance.clamp(min=1e-6) | |
| ratio = ratio.clamp(max=1.0) | |
| new_state = deepcopy(state) | |
| new_state[:2] = state[:2] + ratio * (target[:2] - state[:2]) | |
| new_state[2] = torch.atan2( | |
| state[2].sin() + ratio * (target[2].sin() - state[2].sin()), | |
| state[2].cos() + ratio * (target[2].cos() - state[2].cos()) | |
| ) | |
| if is_end: | |
| new_state[3] = 0 | |
| return new_state | |
| def constant_headaway(states, num_steps, dt): | |
| ''' | |
| Args: | |
| states: current states of a batch of vehicles, of size (num_agents, [x, y, yaw, speed]) | |
| num_steps: number of steps to move forward | |
| dt: time duration | |
| Return: | |
| trajs: the trajectories of the vehicles, of size (num_agents, num_steps, [x, y, yaw, speed]) | |
| ''' | |
| # state: [x, y, yaw, speed] | |
| x = states[:, 0] | |
| y = states[:, 1] | |
| yaw = states[:, 2] | |
| speed = states[:, 3] | |
| # Generate time steps | |
| t_steps = torch.arange(num_steps) * dt | |
| # Calculate dx and dy for each step | |
| dx = torch.outer(speed * torch.sin(yaw), t_steps) | |
| dy = torch.outer(speed * torch.cos(yaw), t_steps) | |
| # Update x and y positions | |
| x_traj = x.unsqueeze(1) + dx | |
| y_traj = y.unsqueeze(1) + dy | |
| # Replicate the yaw and speed for each time step | |
| yaw_traj = yaw.unsqueeze(1).repeat(1, num_steps) | |
| speed_traj = speed.unsqueeze(1).repeat(1, num_steps) | |
| # Stack the x, y, yaw, and speed components to form the trajectory | |
| trajs = torch.stack((x_traj, y_traj, yaw_traj, speed_traj), dim=-1) | |
| return trajs | |
| class IDM: | |
| def __init__( | |
| self, v0=30.0, s0=5.0, T=2.0, a=2.0, b=4.0, delta=4.0, | |
| lookahead_path_length=100, lead_distance_threshold=1.0 | |
| ): | |
| ''' | |
| Args: | |
| v0: desired speed | |
| s0: minimum gap | |
| T: safe time headway | |
| a: max acceleration | |
| b: comfortable deceleration | |
| delta: acceleration exponent | |
| lookahead_path_length: the length of path to look ahead | |
| lead_distance_threshold: the distance to consider a vehicle as a lead vehicle | |
| ''' | |
| self.v0 = v0 | |
| self.s0 = s0 | |
| self.T = T | |
| self.a = a | |
| self.b = b | |
| self.delta = delta | |
| self.lookahead_path_length = lookahead_path_length | |
| self.lead_distance_threshold = lead_distance_threshold | |
| def update(self, state, path, dt, neighbors): | |
| ''' | |
| Args: | |
| state: current state of the vehicle, of size [x, y, yaw, speed] | |
| path: the path to follow, of size (N, [x, y, yaw]) | |
| dt: time duration | |
| neighbors: the future states of the neighbors, of size (K, T, [x, y, yaw, speed]) | |
| ''' | |
| if path is None: | |
| return deepcopy(state) | |
| # find the nearest point in the path | |
| dists = torch.norm(path[:, :2] - state[None, :2], dim=1) | |
| nearest_index = torch.argmin(dists) | |
| # lookahead_distance = state[3] * dt | |
| # lookahead_targe = state[:2] + np.array([np.sin(state[2]) * lookahead_distance, np.cos(state[2]) * lookahead_distance]) | |
| # # target = path[-1] | |
| # is_end = False | |
| # target_idx = torch.argmin(torch.norm(path[:, :2] - lookahead_targe, dim=-1)) | |
| # target = path[target_idx] | |
| # find the target point | |
| lookahead_distance = state[3] * dt | |
| target = path[-1] | |
| is_end = True | |
| for i in range(nearest_index + 1, len(path)): | |
| if torch.norm(path[i, :2] - state[:2]) > lookahead_distance: | |
| target = path[i] | |
| is_end = False | |
| break | |
| # distance between neighbors and the path | |
| lookahead_path = path[nearest_index + 1:][:self.lookahead_path_length] | |
| lookahead_neighbors = neighbors[..., None, :].expand( | |
| -1, -1, lookahead_path.shape[0], -1 | |
| ) # (K, T, n, 4) | |
| dists_neighbors = torch.norm( | |
| lookahead_neighbors[..., :2] - lookahead_path[None, None, :, :2], dim=-1 | |
| ) # (K, T, n) | |
| indices_neighbors = torch.arange( | |
| lookahead_path.shape[0] | |
| )[None, None].expand_as(dists_neighbors) | |
| # determine lead vehicles | |
| is_lead = (dists_neighbors < self.lead_distance_threshold) | |
| if is_lead.any(): | |
| # compute lead distance | |
| indices_lead = indices_neighbors[is_lead] # (num_lead) | |
| lookahead_lengths = torch.cumsum(torch.norm( | |
| lookahead_path[1:, :2] - lookahead_path[:-1, :2], dim=1 | |
| ), dim=0) | |
| lookahead_lengths = torch.cat([lookahead_lengths, lookahead_lengths[-1:]]) | |
| lead_distance = lookahead_lengths[indices_lead] | |
| # compute lead speed | |
| states_lead = lookahead_neighbors[is_lead] # (num_lead, 4) | |
| ori_speed_lead = states_lead[:, 3] | |
| yaw_lead = states_lead[:, 2] | |
| yaw_path = lookahead_path[indices_lead, 2] | |
| lead_speed = ori_speed_lead * (yaw_lead - yaw_path).cos() | |
| # compute acceleration | |
| ego_speed = state[3] | |
| delta_v = ego_speed - lead_speed | |
| s_star = self.s0 + \ | |
| (ego_speed * self.T + ego_speed * delta_v / (2 * math.sqrt(self.a * self.b))).clamp(min=0) | |
| acceleration = self.a * (1 - (ego_speed / self.v0) ** self.delta - (s_star / lead_distance) ** 2) | |
| acceleration = acceleration.min() | |
| else: | |
| acceleration = self.a * (1 - (state[3] / self.v0) ** self.delta) | |
| # compute the new state | |
| target_distance = torch.norm(target[:2] - state[:2]) | |
| ratio = lookahead_distance / target_distance.clamp(min=1e-6) | |
| ratio = ratio.clamp(max=1.0) | |
| new_state = deepcopy(state) | |
| new_state[:2] = state[:2] + ratio * (target[:2] - state[:2]) | |
| new_state[2] = torch.atan2( | |
| state[2].sin() + ratio * (target[2].sin() - state[2].sin()), | |
| state[2].cos() + ratio * (target[2].cos() - state[2].cos()) | |
| ) | |
| if is_end: | |
| new_state[3] = 0 | |
| else: | |
| new_state[3] = (state[3] + acceleration * dt).clamp(min=0) | |
| return new_state | |
| class AttackPlanner: | |
| def __init__(self, pred_steps=20, ATTACK_FREQ = 3, best_k=1, device='cpu'): | |
| self.device = device | |
| self.predict_steps = pred_steps | |
| self.best_k = best_k | |
| self.planner = SplinePlanner( | |
| device, | |
| N_seg=self.predict_steps, | |
| acce_grid=torch.linspace(-2, 5, 10).to(self.device), | |
| acce_bound=[-6, 5], | |
| vbound=[-2, 50] | |
| ) | |
| self.planner.psi_bound = [-math.pi * 2, math.pi * 2] | |
| self.exec_traj = None | |
| self.exec_pointer = 1 | |
| def update( | |
| self, state, unified_map, dt, | |
| neighbors, attacked_states, | |
| new_plan=True | |
| ): | |
| ''' | |
| Args: | |
| state: current state of the vehicle, of size [x, y, yaw, speed] | |
| vector_map: the vector map | |
| attacked_states: future states of the attacked agent, of size (T, [x, y, yaw, speed]) | |
| neighbors: future states of the neighbors, of size (K, T, [x, y, yaw, speed]) | |
| new_plan: whether to generate a new plan | |
| ''' | |
| assert self.exec_pointer > 0 | |
| # directly execute the current plan | |
| if not new_plan: | |
| if self.exec_traj is not None and \ | |
| self.exec_pointer < self.exec_traj.shape[0]: | |
| next_state = self.exec_traj[self.exec_pointer] | |
| self.exec_pointer += 1 | |
| return next_state | |
| else: | |
| new_plan = True | |
| assert attacked_states.shape[0] == self.predict_steps | |
| # state: [x, y, yaw, speed] | |
| x, y, yaw, speed = state | |
| # query vector map to get lanes | |
| query_xyzr = np.array([x, y, 0, yaw + np.pi / 2]) | |
| # query_xyzr = unified_map.xyzr_local2world(np.array([x, y, 0, yaw])) | |
| # lanes = unified_map.vector_map.get_lanes_within(query_xyzr[:3], dist=30) | |
| # lanes = [unified_map.batch_xyzr_world2local(l.center.xyzh)[:, [0,1,3]] for l in lanes] | |
| # lanes = [l.center.xyzh[:, [0,1,3]] for l in lanes] | |
| lanes = None | |
| # for lane in lanes: | |
| # plt.plot(lane[:, 0], lane[:, 1], 'k--', linewidth=0.5, alpha=0.5) | |
| # generate spline trajectories | |
| x0 = torch.tensor([query_xyzr[0], query_xyzr[1], speed, query_xyzr[3]], device=self.device) | |
| possible_trajs, xf_set = self.planner.gen_trajectories(x0, self.predict_steps * dt, lanes, | |
| dyn_filter=True) # (num_trajs, T-1, [x, y, v, a, yaw, r, t]) | |
| if possible_trajs.shape[0] == 0: | |
| trajs = constant_headaway(state[None], self.predict_steps, dt) # (1, T, [x, y, yaw, speed]) | |
| else: | |
| trajs = torch.cat([ | |
| state[None, None].expand(possible_trajs.shape[0], -1, -1), | |
| possible_trajs[..., [0, 1, 4, 2]] | |
| ], dim=1) | |
| # select the best trajectory | |
| attack_distance = torch.norm(attacked_states[None, :, :2] - trajs[..., :2], dim=-1) | |
| cost_attack = attack_distance.min(dim=1).values | |
| cost_collision = ( | |
| torch.norm(neighbors[None, ..., :2] - trajs[:, None, :, :2], dim=-1).min(dim=-1).values < 2.0).sum( | |
| dim=-1) | |
| cost = cost_attack + 0.1 * cost_collision | |
| values, indices = torch.topk(cost, self.best_k, largest=False) | |
| random_index = torch.randint(0, self.best_k, (1,)).item() | |
| selected_index = indices[random_index] | |
| traj_best = trajs[selected_index] | |
| # produce next state | |
| self.exec_traj = traj_best | |
| self.exec_traj[:, 2] -= np.pi / 2 | |
| self.exec_pointer = 1 | |
| next_state = self.exec_traj[self.exec_pointer] | |
| # next_state[0] = -next_state[0] | |
| self.exec_pointer += 1 | |
| return next_state | |
| class ConstantPlanner: | |
| def __init__(self): | |
| return | |
| def update(self, state, dt): | |
| a, b, yaw, v = state | |
| a = a - v * np.sin(yaw) * dt | |
| b = b + v * np.cos(yaw) * dt | |
| return torch.tensor([a, b, yaw, v]) | |
| class UnicyclePlanner: | |
| def __init__(self, uc_path, speed=1.0): | |
| self.uc_model = unicycle.restore(torch.load(uc_path, weights_only=False)) | |
| self.t = 0 | |
| self.speed = speed | |
| def update(self, dt): | |
| self.t += dt * self.speed | |
| a, b, v, pitchroll, yaw, h = self.uc_model.forward(self.t) | |
| # return torch.tensor([a, b, yaw, v]), pitchroll.detach().cpu(), h.item() | |
| return torch.tensor([a, b, yaw, v]) | |