ScenarioGenDemo / runner /lane_explorer.py
puffyyy's picture
Upload 101 files
33bdf0e
#!/usr/bin/env python
# Copyright (c) 2019 Computer Vision Center (CVC) at the Universitat Autonoma de
# Barcelona (UAB).
#
# This work is licensed under the terms of the MIT license.
# For a copy, see <https://opensource.org/licenses/MIT>.
import glob
import os
import sys
import numpy as np
try:
sys.path.append(glob.glob('../carla/dist/carla-*%d.%d-%s.egg' % (
sys.version_info.major,
sys.version_info.minor,
'win-amd64' if os.name == 'nt' else 'linux-x86_64'))[0])
except IndexError:
pass
import carla
import argparse
import random
import time
red = carla.Color(255, 0, 0)
green = carla.Color(0, 255, 0)
blue = carla.Color(47, 210, 231)
cyan = carla.Color(0, 255, 255)
yellow = carla.Color(255, 255, 0)
orange = carla.Color(255, 162, 0)
white = carla.Color(255, 255, 255)
black = carla.Color(0,0,0)
trail_life_time = 10
waypoint_separation = 4
argparser = argparse.ArgumentParser()
argparser.add_argument(
'--host',
metavar='H',
default='127.0.0.1',
help='IP of the host server (default: 127.0.0.1)')
argparser.add_argument(
'-p', '--port',
metavar='P',
default=2000,
type=int,
help='TCP port to listen to (default: 2000)')
argparser.add_argument(
'-i', '--info',
action='store_true',
help='Show text information')
argparser.add_argument(
'-x',
default=0.0,
type=float,
help='X start position (default: 0.0)')
argparser.add_argument(
'-y',
default=0.0,
type=float,
help='Y start position (default: 0.0)')
argparser.add_argument(
'-z',
default=0.0,
type=float,
help='Z start position (default: 0.0)')
argparser.add_argument(
'-s', '--seed',
metavar='S',
default=os.getpid(),
type=int,
help='Seed for the random path (default: program pid)')
argparser.add_argument(
'-t', '--tick-time',
metavar='T',
default=0.2,
type=float,
help='Tick time between updates (forward velocity) (default: 0.2)')
def get_angle(v1,v2):
def dot(v1,v2):
return (v1.x * v2.x + v1.y * v2.y)
def cross(v1,v2):
return v1.x*v2.y - v1.y*v2.x
def len(v):
return np.sqrt(v.x*v.x + v.y* v.y)
sign = np.sign(cross(v1,v2))
angle = np.arccos(dot(v1,v2)/(len(v1) * len(v2)))
angle = sign * np.degrees(angle)
return angle
def draw_transform(debug, trans, col=carla.Color(255, 0, 0), lt=-1):
debug.draw_arrow(
trans.location, trans.location + trans.get_forward_vector(),
thickness=0.05, arrow_size=0.1, color=col, life_time=lt)
def draw_waypoint_union(debug, w0, w1, color=carla.Color(255, 0, 0), lt=5):
debug.draw_line(
w0.transform.location + carla.Location(z=0.25),
w1.transform.location + carla.Location(z=0.25),
thickness=0.1, color=color, life_time=lt, persistent_lines=False)
debug.draw_point(w1.transform.location + carla.Location(z=0.25), 0.2, color, lt, False)
def draw_waypoint_info(debug, w, lt=30):
w_loc = w.transform.location
debug.draw_string(w_loc + carla.Location(z=0.5), "lane: " + str(w.lane_id), False, yellow, lt)
debug.draw_string(w_loc + carla.Location(z=1.0), "road: " + str(w.road_id), False, black, lt)
debug.draw_string(w_loc + carla.Location(z=-.5), str(w.lane_change), False, red, lt)
def draw_junction(debug, junction, l_time=10):
"""Draws a junction bounding box and the initial and final waypoint of every lane."""
# draw bounding box
box = junction.bounding_box
point1 = box.location + carla.Location(x=box.extent.x, y=box.extent.y, z=2)
point2 = box.location + carla.Location(x=-box.extent.x, y=box.extent.y, z=2)
point3 = box.location + carla.Location(x=-box.extent.x, y=-box.extent.y, z=2)
point4 = box.location + carla.Location(x=box.extent.x, y=-box.extent.y, z=2)
debug.draw_line(
point1, point2,
thickness=0.1, color=orange, life_time=l_time, persistent_lines=False)
debug.draw_line(
point2, point3,
thickness=0.1, color=orange, life_time=l_time, persistent_lines=False)
debug.draw_line(
point3, point4,
thickness=0.1, color=orange, life_time=l_time, persistent_lines=False)
debug.draw_line(
point4, point1,
thickness=0.1, color=orange, life_time=l_time, persistent_lines=False)
# draw junction pairs (begin-end) of every lane
junction_w = junction.get_waypoints(carla.LaneType.Any)
for pair_w in junction_w:
draw_transform(debug, pair_w[0].transform, orange, l_time)
debug.draw_point(
pair_w[0].transform.location + carla.Location(z=0.75), 0.1, orange, l_time, False)
draw_transform(debug, pair_w[1].transform, orange, l_time)
debug.draw_point(
pair_w[1].transform.location + carla.Location(z=0.75), 0.1, orange, l_time, False)
debug.draw_line(
pair_w[0].transform.location + carla.Location(z=0.75),
pair_w[1].transform.location + carla.Location(z=0.75), 0.1, white, l_time, False)
def random_walk(map, debug, args, loc):
m = map
current_w = m.get_waypoint(loc)
# for item in current_w.next(4):
# print(item.lane_type)
# print(current_w.road_id)
# main loop
waypoints = m.generate_waypoints(10)
trail_life_time = 600
for w in waypoints:
if w.transform.location.distance(loc) > 50:
continue
# Render some nice information, notice that you can't see the strings if you are using an editor camera
if args.info:
draw_waypoint_info(debug, w, trail_life_time)
# time.sleep(args.tick_time)
class ScenarioMap():
def __init__(self, map) -> None:
self.lane_id_map = {}
self.road_id_lanes = {}
self.road_relation = {}
self.path_round_id_map = {}
self.topology = []
self.waypoints = []
self.selected_map = []
self._carla_map = map
self._sampling_resolution = 8
self._build_topology()
self._build_relation()
def waypoint2dict(self, w):
return {
'is_junction':w.is_junction,
'lane_id':w.lane_id,
'road_id':w.road_id,
'lane_width':w.lane_width,
'location':list(map(round,(w.transform.location.x,w.transform.location.y,w.transform.location.z))),
'heading':round(w.transform.rotation.yaw)
}
def _build_relation(self):
relation = {}
for se, lane in self.path_round_id_map.items():
lane_id = lane['lane_id']
last_wp = lane['path'][-1]
next_wps = last_wp.next(1)
if lane_id not in relation:
relation[lane_id] = []
for wp in next_wps:
if wp.road_id in self.road_id_lanes and wp.lane_id in self.road_id_lanes[wp.road_id]:
relation[lane_id].append(self.road_id_lanes[wp.road_id][wp.lane_id])
else:
print('not exist roadid & lane_id')
self.road_relation = relation
def _build_topology(self):
_wmap = self._carla_map
for segment in _wmap.get_topology():
wp1, wp2 = segment
path = wp1.next_until_lane_end(self._sampling_resolution)
if len(path) != 0:
wp_end = path[-1]
l1, l2 = wp1.transform.location, wp_end.transform.location
if l1.distance(l2) < 1:
continue
locs = list(np.round([l1.x,l1.y,l2.z,l2.x,l2.y,l2.z]))
path_round = '-'.join(map(str,locs))
else:
continue
if path_round not in self.path_round_id_map:
seg_dict = dict()
seg_dict['entry'], seg_dict['exit'] = wp1, wp_end
seg_dict['entryxyz'], seg_dict['exitxyz'] = locs[0:3], locs[3:]
seg_dict['heading'] = round(wp1.transform.rotation.yaw)
seg_dict['road_id'] = wp1.road_id
seg_dict['path'] = [wp1] + path
seg_dict['lane'] = wp1.lane_id
seg_dict['lane_id'] = len(self.lane_id_map)
seg_dict['is_junction'] = wp1.is_junction
if wp1.road_id not in self.road_id_lanes:
self.road_id_lanes[wp1.road_id] = {wp1.lane_id: seg_dict['lane_id']}
else:
self.road_id_lanes[wp1.road_id][wp1.lane_id] = seg_dict['lane_id']
self.lane_id_map[seg_dict['lane_id']] = seg_dict
self.path_round_id_map[path_round] = seg_dict
self.topology.append(seg_dict)
self.waypoints.append(wp1)
self.waypoints.append(wp_end)
# print(len(self.waypoints), len(set(self.waypoints)))
def get_selected_map(self, loc):
waypoints_nearby_road = []
for w in self.waypoints:
if w.transform.location.distance(loc) < 45:
waypoints_nearby_road.append(w)
selected_map = {'lanes':[],'junctions':[]}
junc_set = set()
for w in waypoints_nearby_road:
if w.is_junction is False:
for lane in self.topology:
if lane not in selected_map['lanes'] and w in lane['path']:
selected_map['lanes'].append(lane)
else:
junc = w.get_junction()
if junc.id not in junc_set:
selected_map['junctions'].append({'extent':list(map(round,(junc.bounding_box.extent.x,junc.bounding_box.extent.y,junc.bounding_box.extent.z))),
'location':list(map(round,(junc.bounding_box.location.x,junc.bounding_box.location.y,junc.bounding_box.location.z))),
'heading':round(junc.bounding_box.rotation.yaw),
# 'id':junc.id
})
junc_set.add(junc.id)
self.selected_map = selected_map
select_lane_ids = set()
for lane in selected_map['lanes']:
select_lane_ids.add(lane['lane_id'])
self.select_lane_ids = select_lane_ids
return self.selected_map
def visualize(self, debug):
lane_relation = self.get_lane_relation()
for lane_id in self.select_lane_ids:
lane = self.lane_id_map[lane_id]
for wp_idx in range(len(lane['path']) - 1):
draw_waypoint_union(debug, lane['path'][wp_idx], lane['path'][wp_idx +1], lt=600)
if lane_id not in lane_relation:
continue
for next_lane_id, relation_type in lane_relation[lane_id]:
next_lane = self.lane_id_map[next_lane_id]
draw_waypoint_union(debug, lane['path'][-1], next_lane['path'][0], lt=600)
return
def get_lane_relation(self,):
lane_relation = {}
merge_lane = {}
for src, dests in self.road_relation.items():
src_lane = self.lane_id_map[src]
if src not in self.select_lane_ids or src_lane['is_junction']:
continue
for dest in dests:
next_lane = self.lane_id_map[dest]
if next_lane['is_junction']: # next lane by other road
if dest in self.road_relation:
sec_dests = self.road_relation[dest]
for sec_dest in sec_dests:
if sec_dest not in self.select_lane_ids:
continue
sec_dest_lane = self.lane_id_map[sec_dest]
dest_waypoint = sec_dest_lane['path'][0]
src_waypoint = src_lane['path'][0]
src_transform = src_waypoint.transform
dest_transform = dest_waypoint.transform
s2d_degree = get_angle(src_transform.get_forward_vector(), dest_transform.get_forward_vector())
relation = 'straight'
if -10 < s2d_degree < 10:
relation = 'straight'
elif 80 < s2d_degree < 100:
relation = 'right'
elif -100 < s2d_degree < -80:
relation = 'left'
elif 170 < s2d_degree < 190:
relation = 'back'
else:
relation = 'other'
if src not in lane_relation:
lane_relation[src] = [(sec_dest,relation)]
else:
if sec_dest in lane_relation[src]:
print('double exist road relation', src, sec_dest)
lane_relation[src].append((sec_dest,relation))
else:
if dest in self.select_lane_ids:
src_waypoint = src_lane['path'][0]
dest_lane = self.lane_id_map[dest]
dest_waypoint = dest_lane['path'][0]
src_transform = src_waypoint.transform
dest_transform = dest_waypoint.transform
s2d_degree = get_angle(src_transform.get_forward_vector(),dest_transform.get_forward_vector())
relation = 'straight'
if -10 < s2d_degree < 10:
relation = 'straight'
if len(dests) == 1:
if dest not in merge_lane:
merge_lane[src] = [src, dest]
merge_lane[dest] = []
else:
merge_lane[src] = [src] + merge_lane[dest]
merge_lane[dest] = []
elif 80 < s2d_degree < 100:
relation = 'right'
elif -100 < s2d_degree < -80:
relation = 'left'
elif 170 < s2d_degree < 190:
relation = 'back'
else:
relation = 'other'
if src not in lane_relation:
lane_relation[src] = [(dest, relation)]
else:
lane_relation[src].append((dest,relation))
for i in range(6):
for lane in merge_lane.keys():
if len(merge_lane[lane]) == 0:
continue
next_lane = merge_lane[lane][-1]
while (next_lane in merge_lane and len(merge_lane[next_lane])) > 0:
merge_lane[lane] = merge_lane[lane] + merge_lane[next_lane][1:]
merge_lane[next_lane] = []
remove_selected_lanes = []
for _, lanes in merge_lane.items():
if len(lanes) < 1:
continue
target = lanes[-1]
target_lane = self.lane_id_map[target]
new_path = []
for lane_id in lanes[0:-1]:
if lane_id in self.select_lane_ids:
self.select_lane_ids.remove(lane_id)
if lane_id in lane_relation:
if len(lane_relation[lane_id]) >1 :
print('muti relation in merged lane')
lane_relation.pop(lane_id)
for idx, lane in enumerate(self.selected_map['lanes']):
if lane['lane_id'] == lane_id:
if idx == 0:
target_lane['entry'] = lane['entry']
target_lane['entryxyz'] = lane['entryxyz']
if idx == len(self.selected_map['lanes']) - 1:
target_lane['exit'] = lane['exit']
target_lane['exitxyz'] = lane['exitxyz']
new_path.extend(lane['path'])
remove_selected_lanes.append(lane)
new_path.extend(target_lane['path'])
target_lane['path'] = new_path
for item in remove_selected_lanes:
self.selected_map['lanes'].remove(item)
# print(self.selected_map['lanes'])
return lane_relation
def get_same_road_relation(self,):
same_road = []
for k, v in self.road_id_lanes.items():
same_forward = []
same_backward = []
for l, lane_id in v.items():
if lane_id in self.select_lane_ids:
if l > 0:
same_forward.append(lane_id)
else:
same_backward.append(lane_id)
if len(same_forward) != 0:
same_road.append(same_forward)
if len(same_backward) != 0:
same_road.append(same_backward)
self.same_road = same_road
return same_road
def generate_map_info(self, loc):
self.get_selected_map(loc)
lane_relation = self.get_lane_relation()
road_relation = self.get_same_road_relation()
select_lanes = []
selected_map = {}
for lane in self.selected_map['lanes']:
lane_js = {
'entryxyz':lane['entryxyz'],
'exitxyz':lane['exitxyz'],
'path': [self.waypoint2dict(w) for w in lane['path']],
'lane':lane['lane'],
'lane_id':lane['lane_id'],
'heading':lane['heading'],
'road_id':lane['road_id'],
}
select_lanes.append(lane_js)
selected_map['lanes'] = select_lanes
selected_map['lane_relations'] = lane_relation
selected_map['road'] = road_relation
return selected_map
def main():
args = argparser.parse_args()
try:
client = carla.Client(args.host, args.port)
client.set_timeout(2.0)
world = client.get_world()
m, debug = world.get_map(), world.debug
random.seed(args.seed)
loc = carla.Location(args.x, args.y, args.z)
# print("Initial location: ", loc)
scenario_map = ScenarioMap(m)
selected_map = scenario_map.generate_map_info(loc)
# print(selected_map)
print(len(selected_map['lanes']))
print(selected_map['lane_relations'])
scenario_map.visualize(debug=debug)
finally:
pass
if __name__ == '__main__':
main()