Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python | |
| # Copyright (c) 2019 Computer Vision Center (CVC) at the Universitat Autonoma de | |
| # Barcelona (UAB). | |
| # | |
| # This work is licensed under the terms of the MIT license. | |
| # For a copy, see <https://opensource.org/licenses/MIT>. | |
| import glob | |
| import os | |
| import sys | |
| import numpy as np | |
| try: | |
| sys.path.append(glob.glob('../carla/dist/carla-*%d.%d-%s.egg' % ( | |
| sys.version_info.major, | |
| sys.version_info.minor, | |
| 'win-amd64' if os.name == 'nt' else 'linux-x86_64'))[0]) | |
| except IndexError: | |
| pass | |
| import carla | |
| import argparse | |
| import random | |
| import time | |
| red = carla.Color(255, 0, 0) | |
| green = carla.Color(0, 255, 0) | |
| blue = carla.Color(47, 210, 231) | |
| cyan = carla.Color(0, 255, 255) | |
| yellow = carla.Color(255, 255, 0) | |
| orange = carla.Color(255, 162, 0) | |
| white = carla.Color(255, 255, 255) | |
| black = carla.Color(0,0,0) | |
| trail_life_time = 10 | |
| waypoint_separation = 4 | |
| argparser = argparse.ArgumentParser() | |
| argparser.add_argument( | |
| '--host', | |
| metavar='H', | |
| default='127.0.0.1', | |
| help='IP of the host server (default: 127.0.0.1)') | |
| argparser.add_argument( | |
| '-p', '--port', | |
| metavar='P', | |
| default=2000, | |
| type=int, | |
| help='TCP port to listen to (default: 2000)') | |
| argparser.add_argument( | |
| '-i', '--info', | |
| action='store_true', | |
| help='Show text information') | |
| argparser.add_argument( | |
| '-x', | |
| default=0.0, | |
| type=float, | |
| help='X start position (default: 0.0)') | |
| argparser.add_argument( | |
| '-y', | |
| default=0.0, | |
| type=float, | |
| help='Y start position (default: 0.0)') | |
| argparser.add_argument( | |
| '-z', | |
| default=0.0, | |
| type=float, | |
| help='Z start position (default: 0.0)') | |
| argparser.add_argument( | |
| '-s', '--seed', | |
| metavar='S', | |
| default=os.getpid(), | |
| type=int, | |
| help='Seed for the random path (default: program pid)') | |
| argparser.add_argument( | |
| '-t', '--tick-time', | |
| metavar='T', | |
| default=0.2, | |
| type=float, | |
| help='Tick time between updates (forward velocity) (default: 0.2)') | |
| def get_angle(v1,v2): | |
| def dot(v1,v2): | |
| return (v1.x * v2.x + v1.y * v2.y) | |
| def cross(v1,v2): | |
| return v1.x*v2.y - v1.y*v2.x | |
| def len(v): | |
| return np.sqrt(v.x*v.x + v.y* v.y) | |
| sign = np.sign(cross(v1,v2)) | |
| angle = np.arccos(dot(v1,v2)/(len(v1) * len(v2))) | |
| angle = sign * np.degrees(angle) | |
| return angle | |
| def draw_transform(debug, trans, col=carla.Color(255, 0, 0), lt=-1): | |
| debug.draw_arrow( | |
| trans.location, trans.location + trans.get_forward_vector(), | |
| thickness=0.05, arrow_size=0.1, color=col, life_time=lt) | |
| def draw_waypoint_union(debug, w0, w1, color=carla.Color(255, 0, 0), lt=5): | |
| debug.draw_line( | |
| w0.transform.location + carla.Location(z=0.25), | |
| w1.transform.location + carla.Location(z=0.25), | |
| thickness=0.1, color=color, life_time=lt, persistent_lines=False) | |
| debug.draw_point(w1.transform.location + carla.Location(z=0.25), 0.2, color, lt, False) | |
| def draw_waypoint_info(debug, w, lt=30): | |
| w_loc = w.transform.location | |
| debug.draw_string(w_loc + carla.Location(z=0.5), "lane: " + str(w.lane_id), False, yellow, lt) | |
| debug.draw_string(w_loc + carla.Location(z=1.0), "road: " + str(w.road_id), False, black, lt) | |
| debug.draw_string(w_loc + carla.Location(z=-.5), str(w.lane_change), False, red, lt) | |
| def draw_junction(debug, junction, l_time=10): | |
| """Draws a junction bounding box and the initial and final waypoint of every lane.""" | |
| # draw bounding box | |
| box = junction.bounding_box | |
| point1 = box.location + carla.Location(x=box.extent.x, y=box.extent.y, z=2) | |
| point2 = box.location + carla.Location(x=-box.extent.x, y=box.extent.y, z=2) | |
| point3 = box.location + carla.Location(x=-box.extent.x, y=-box.extent.y, z=2) | |
| point4 = box.location + carla.Location(x=box.extent.x, y=-box.extent.y, z=2) | |
| debug.draw_line( | |
| point1, point2, | |
| thickness=0.1, color=orange, life_time=l_time, persistent_lines=False) | |
| debug.draw_line( | |
| point2, point3, | |
| thickness=0.1, color=orange, life_time=l_time, persistent_lines=False) | |
| debug.draw_line( | |
| point3, point4, | |
| thickness=0.1, color=orange, life_time=l_time, persistent_lines=False) | |
| debug.draw_line( | |
| point4, point1, | |
| thickness=0.1, color=orange, life_time=l_time, persistent_lines=False) | |
| # draw junction pairs (begin-end) of every lane | |
| junction_w = junction.get_waypoints(carla.LaneType.Any) | |
| for pair_w in junction_w: | |
| draw_transform(debug, pair_w[0].transform, orange, l_time) | |
| debug.draw_point( | |
| pair_w[0].transform.location + carla.Location(z=0.75), 0.1, orange, l_time, False) | |
| draw_transform(debug, pair_w[1].transform, orange, l_time) | |
| debug.draw_point( | |
| pair_w[1].transform.location + carla.Location(z=0.75), 0.1, orange, l_time, False) | |
| debug.draw_line( | |
| pair_w[0].transform.location + carla.Location(z=0.75), | |
| pair_w[1].transform.location + carla.Location(z=0.75), 0.1, white, l_time, False) | |
| def random_walk(map, debug, args, loc): | |
| m = map | |
| current_w = m.get_waypoint(loc) | |
| # for item in current_w.next(4): | |
| # print(item.lane_type) | |
| # print(current_w.road_id) | |
| # main loop | |
| waypoints = m.generate_waypoints(10) | |
| trail_life_time = 600 | |
| for w in waypoints: | |
| if w.transform.location.distance(loc) > 50: | |
| continue | |
| # Render some nice information, notice that you can't see the strings if you are using an editor camera | |
| if args.info: | |
| draw_waypoint_info(debug, w, trail_life_time) | |
| # time.sleep(args.tick_time) | |
| class ScenarioMap(): | |
| def __init__(self, map) -> None: | |
| self.lane_id_map = {} | |
| self.road_id_lanes = {} | |
| self.road_relation = {} | |
| self.path_round_id_map = {} | |
| self.topology = [] | |
| self.waypoints = [] | |
| self.selected_map = [] | |
| self._carla_map = map | |
| self._sampling_resolution = 8 | |
| self._build_topology() | |
| self._build_relation() | |
| def waypoint2dict(self, w): | |
| return { | |
| 'is_junction':w.is_junction, | |
| 'lane_id':w.lane_id, | |
| 'road_id':w.road_id, | |
| 'lane_width':w.lane_width, | |
| 'location':list(map(round,(w.transform.location.x,w.transform.location.y,w.transform.location.z))), | |
| 'heading':round(w.transform.rotation.yaw) | |
| } | |
| def _build_relation(self): | |
| relation = {} | |
| for se, lane in self.path_round_id_map.items(): | |
| lane_id = lane['lane_id'] | |
| last_wp = lane['path'][-1] | |
| next_wps = last_wp.next(1) | |
| if lane_id not in relation: | |
| relation[lane_id] = [] | |
| for wp in next_wps: | |
| if wp.road_id in self.road_id_lanes and wp.lane_id in self.road_id_lanes[wp.road_id]: | |
| relation[lane_id].append(self.road_id_lanes[wp.road_id][wp.lane_id]) | |
| else: | |
| print('not exist roadid & lane_id') | |
| self.road_relation = relation | |
| def _build_topology(self): | |
| _wmap = self._carla_map | |
| for segment in _wmap.get_topology(): | |
| wp1, wp2 = segment | |
| path = wp1.next_until_lane_end(self._sampling_resolution) | |
| if len(path) != 0: | |
| wp_end = path[-1] | |
| l1, l2 = wp1.transform.location, wp_end.transform.location | |
| if l1.distance(l2) < 1: | |
| continue | |
| locs = list(np.round([l1.x,l1.y,l2.z,l2.x,l2.y,l2.z])) | |
| path_round = '-'.join(map(str,locs)) | |
| else: | |
| continue | |
| if path_round not in self.path_round_id_map: | |
| seg_dict = dict() | |
| seg_dict['entry'], seg_dict['exit'] = wp1, wp_end | |
| seg_dict['entryxyz'], seg_dict['exitxyz'] = locs[0:3], locs[3:] | |
| seg_dict['heading'] = round(wp1.transform.rotation.yaw) | |
| seg_dict['road_id'] = wp1.road_id | |
| seg_dict['path'] = [wp1] + path | |
| seg_dict['lane'] = wp1.lane_id | |
| seg_dict['lane_id'] = len(self.lane_id_map) | |
| seg_dict['is_junction'] = wp1.is_junction | |
| if wp1.road_id not in self.road_id_lanes: | |
| self.road_id_lanes[wp1.road_id] = {wp1.lane_id: seg_dict['lane_id']} | |
| else: | |
| self.road_id_lanes[wp1.road_id][wp1.lane_id] = seg_dict['lane_id'] | |
| self.lane_id_map[seg_dict['lane_id']] = seg_dict | |
| self.path_round_id_map[path_round] = seg_dict | |
| self.topology.append(seg_dict) | |
| self.waypoints.append(wp1) | |
| self.waypoints.append(wp_end) | |
| # print(len(self.waypoints), len(set(self.waypoints))) | |
| def get_selected_map(self, loc): | |
| waypoints_nearby_road = [] | |
| for w in self.waypoints: | |
| if w.transform.location.distance(loc) < 45: | |
| waypoints_nearby_road.append(w) | |
| selected_map = {'lanes':[],'junctions':[]} | |
| junc_set = set() | |
| for w in waypoints_nearby_road: | |
| if w.is_junction is False: | |
| for lane in self.topology: | |
| if lane not in selected_map['lanes'] and w in lane['path']: | |
| selected_map['lanes'].append(lane) | |
| else: | |
| junc = w.get_junction() | |
| if junc.id not in junc_set: | |
| selected_map['junctions'].append({'extent':list(map(round,(junc.bounding_box.extent.x,junc.bounding_box.extent.y,junc.bounding_box.extent.z))), | |
| 'location':list(map(round,(junc.bounding_box.location.x,junc.bounding_box.location.y,junc.bounding_box.location.z))), | |
| 'heading':round(junc.bounding_box.rotation.yaw), | |
| # 'id':junc.id | |
| }) | |
| junc_set.add(junc.id) | |
| self.selected_map = selected_map | |
| select_lane_ids = set() | |
| for lane in selected_map['lanes']: | |
| select_lane_ids.add(lane['lane_id']) | |
| self.select_lane_ids = select_lane_ids | |
| return self.selected_map | |
| def visualize(self, debug): | |
| lane_relation = self.get_lane_relation() | |
| for lane_id in self.select_lane_ids: | |
| lane = self.lane_id_map[lane_id] | |
| for wp_idx in range(len(lane['path']) - 1): | |
| draw_waypoint_union(debug, lane['path'][wp_idx], lane['path'][wp_idx +1], lt=600) | |
| if lane_id not in lane_relation: | |
| continue | |
| for next_lane_id, relation_type in lane_relation[lane_id]: | |
| next_lane = self.lane_id_map[next_lane_id] | |
| draw_waypoint_union(debug, lane['path'][-1], next_lane['path'][0], lt=600) | |
| return | |
| def get_lane_relation(self,): | |
| lane_relation = {} | |
| merge_lane = {} | |
| for src, dests in self.road_relation.items(): | |
| src_lane = self.lane_id_map[src] | |
| if src not in self.select_lane_ids or src_lane['is_junction']: | |
| continue | |
| for dest in dests: | |
| next_lane = self.lane_id_map[dest] | |
| if next_lane['is_junction']: # next lane by other road | |
| if dest in self.road_relation: | |
| sec_dests = self.road_relation[dest] | |
| for sec_dest in sec_dests: | |
| if sec_dest not in self.select_lane_ids: | |
| continue | |
| sec_dest_lane = self.lane_id_map[sec_dest] | |
| dest_waypoint = sec_dest_lane['path'][0] | |
| src_waypoint = src_lane['path'][0] | |
| src_transform = src_waypoint.transform | |
| dest_transform = dest_waypoint.transform | |
| s2d_degree = get_angle(src_transform.get_forward_vector(), dest_transform.get_forward_vector()) | |
| relation = 'straight' | |
| if -10 < s2d_degree < 10: | |
| relation = 'straight' | |
| elif 80 < s2d_degree < 100: | |
| relation = 'right' | |
| elif -100 < s2d_degree < -80: | |
| relation = 'left' | |
| elif 170 < s2d_degree < 190: | |
| relation = 'back' | |
| else: | |
| relation = 'other' | |
| if src not in lane_relation: | |
| lane_relation[src] = [(sec_dest,relation)] | |
| else: | |
| if sec_dest in lane_relation[src]: | |
| print('double exist road relation', src, sec_dest) | |
| lane_relation[src].append((sec_dest,relation)) | |
| else: | |
| if dest in self.select_lane_ids: | |
| src_waypoint = src_lane['path'][0] | |
| dest_lane = self.lane_id_map[dest] | |
| dest_waypoint = dest_lane['path'][0] | |
| src_transform = src_waypoint.transform | |
| dest_transform = dest_waypoint.transform | |
| s2d_degree = get_angle(src_transform.get_forward_vector(),dest_transform.get_forward_vector()) | |
| relation = 'straight' | |
| if -10 < s2d_degree < 10: | |
| relation = 'straight' | |
| if len(dests) == 1: | |
| if dest not in merge_lane: | |
| merge_lane[src] = [src, dest] | |
| merge_lane[dest] = [] | |
| else: | |
| merge_lane[src] = [src] + merge_lane[dest] | |
| merge_lane[dest] = [] | |
| elif 80 < s2d_degree < 100: | |
| relation = 'right' | |
| elif -100 < s2d_degree < -80: | |
| relation = 'left' | |
| elif 170 < s2d_degree < 190: | |
| relation = 'back' | |
| else: | |
| relation = 'other' | |
| if src not in lane_relation: | |
| lane_relation[src] = [(dest, relation)] | |
| else: | |
| lane_relation[src].append((dest,relation)) | |
| for i in range(6): | |
| for lane in merge_lane.keys(): | |
| if len(merge_lane[lane]) == 0: | |
| continue | |
| next_lane = merge_lane[lane][-1] | |
| while (next_lane in merge_lane and len(merge_lane[next_lane])) > 0: | |
| merge_lane[lane] = merge_lane[lane] + merge_lane[next_lane][1:] | |
| merge_lane[next_lane] = [] | |
| remove_selected_lanes = [] | |
| for _, lanes in merge_lane.items(): | |
| if len(lanes) < 1: | |
| continue | |
| target = lanes[-1] | |
| target_lane = self.lane_id_map[target] | |
| new_path = [] | |
| for lane_id in lanes[0:-1]: | |
| if lane_id in self.select_lane_ids: | |
| self.select_lane_ids.remove(lane_id) | |
| if lane_id in lane_relation: | |
| if len(lane_relation[lane_id]) >1 : | |
| print('muti relation in merged lane') | |
| lane_relation.pop(lane_id) | |
| for idx, lane in enumerate(self.selected_map['lanes']): | |
| if lane['lane_id'] == lane_id: | |
| if idx == 0: | |
| target_lane['entry'] = lane['entry'] | |
| target_lane['entryxyz'] = lane['entryxyz'] | |
| if idx == len(self.selected_map['lanes']) - 1: | |
| target_lane['exit'] = lane['exit'] | |
| target_lane['exitxyz'] = lane['exitxyz'] | |
| new_path.extend(lane['path']) | |
| remove_selected_lanes.append(lane) | |
| new_path.extend(target_lane['path']) | |
| target_lane['path'] = new_path | |
| for item in remove_selected_lanes: | |
| self.selected_map['lanes'].remove(item) | |
| # print(self.selected_map['lanes']) | |
| return lane_relation | |
| def get_same_road_relation(self,): | |
| same_road = [] | |
| for k, v in self.road_id_lanes.items(): | |
| same_forward = [] | |
| same_backward = [] | |
| for l, lane_id in v.items(): | |
| if lane_id in self.select_lane_ids: | |
| if l > 0: | |
| same_forward.append(lane_id) | |
| else: | |
| same_backward.append(lane_id) | |
| if len(same_forward) != 0: | |
| same_road.append(same_forward) | |
| if len(same_backward) != 0: | |
| same_road.append(same_backward) | |
| self.same_road = same_road | |
| return same_road | |
| def generate_map_info(self, loc): | |
| self.get_selected_map(loc) | |
| lane_relation = self.get_lane_relation() | |
| road_relation = self.get_same_road_relation() | |
| select_lanes = [] | |
| selected_map = {} | |
| for lane in self.selected_map['lanes']: | |
| lane_js = { | |
| 'entryxyz':lane['entryxyz'], | |
| 'exitxyz':lane['exitxyz'], | |
| 'path': [self.waypoint2dict(w) for w in lane['path']], | |
| 'lane':lane['lane'], | |
| 'lane_id':lane['lane_id'], | |
| 'heading':lane['heading'], | |
| 'road_id':lane['road_id'], | |
| } | |
| select_lanes.append(lane_js) | |
| selected_map['lanes'] = select_lanes | |
| selected_map['lane_relations'] = lane_relation | |
| selected_map['road'] = road_relation | |
| return selected_map | |
| def main(): | |
| args = argparser.parse_args() | |
| try: | |
| client = carla.Client(args.host, args.port) | |
| client.set_timeout(2.0) | |
| world = client.get_world() | |
| m, debug = world.get_map(), world.debug | |
| random.seed(args.seed) | |
| loc = carla.Location(args.x, args.y, args.z) | |
| # print("Initial location: ", loc) | |
| scenario_map = ScenarioMap(m) | |
| selected_map = scenario_map.generate_map_info(loc) | |
| # print(selected_map) | |
| print(len(selected_map['lanes'])) | |
| print(selected_map['lane_relations']) | |
| scenario_map.visualize(debug=debug) | |
| finally: | |
| pass | |
| if __name__ == '__main__': | |
| main() |