Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python | |
| from __future__ import print_function | |
| import re | |
| import shutil | |
| import sys | |
| import weakref | |
| from queue import Queue | |
| # ============================================================================== | |
| # -- imports ------------------------------------------------------------------- | |
| # ============================================================================== | |
| import cv2 | |
| import carla | |
| import numpy as np | |
| from carla import ColorConverter as cc | |
| import os | |
| import argparse | |
| import logging | |
| import time | |
| # ============================================================================== | |
| # -- Global functions ---------------------------------------------------------- | |
| # ============================================================================== | |
| def get_actor_display_name(actor, truncate=250): | |
| name = ' '.join(actor.type_id.replace('_', '.').title().split('.')[1:]) | |
| return (name[:truncate - 1] + u'\u2026') if len(name) > truncate else name | |
| # ============================================================================== | |
| # -- World --------------------------------------------------------------------- | |
| # ============================================================================== | |
| def find_weather_presets(): | |
| rgx = re.compile('.+?(?:(?<=[a-z])(?=[A-Z])|(?<=[A-Z])(?=[A-Z][a-z])|$)') | |
| name = lambda x: ' '.join(m.group(0) for m in rgx.finditer(x)) | |
| presets = [x for x in dir(carla.WeatherParameters) if re.match('[A-Z].+', x)] | |
| return [(getattr(carla.WeatherParameters, x), name(x)) for x in presets] | |
| class WorldSR(object): | |
| def __init__(self, carla_world, args): | |
| self.player_name = None | |
| self.camera_save_dir = args.save_dir | |
| self.world = carla_world | |
| self.sync = args.sync | |
| self.actor_role_name = args.rolename | |
| self.restarted = False | |
| try: | |
| self.map = self.world.get_map() | |
| except RuntimeError as error: | |
| print('RuntimeError: {}'.format(error)) | |
| print(' The server could not send the OpenDRIVE (.xodr) file:') | |
| print(' Make sure it exists, has the same name of your town, and is correct.') | |
| sys.exit(1) | |
| self.player = None | |
| self.collision_sensor = None | |
| self.lane_invasion_sensor = None | |
| self.gnss_sensor = None | |
| self.imu_sensor = None | |
| self.radar_sensor = None | |
| self.camera_manager = None | |
| self._weather_presets = find_weather_presets() | |
| self._weather_index = 0 | |
| self._actor_filter = args.filter | |
| self._actor_generation = args.generation | |
| self._gamma = args.gamma | |
| self.visualization = [] | |
| self.restart() | |
| def restart(self): | |
| if self.restarted: | |
| return | |
| self.restarted = True | |
| self.player_max_speed = 1.589 | |
| self.player_max_speed_fast = 3.713 | |
| # Keep same camera config if the camera manager exists. | |
| cam_index = self.camera_manager.index if self.camera_manager is not None else 0 | |
| cam_pos_index = self.camera_manager.transform_index if self.camera_manager is not None else 0 | |
| # Get the ego vehicle | |
| while self.player is None: | |
| print("Waiting for the ego vehicle...") | |
| time.sleep(1) | |
| possible_vehicles = self.world.get_actors().filter('vehicle.*') | |
| for vehicle in possible_vehicles: | |
| if vehicle.attributes['role_name'] == self.actor_role_name: | |
| print("Ego vehicle found") | |
| self.player = vehicle | |
| break | |
| self.player_name = self.player.type_id | |
| self.camera_manager = CameraManager(self.player, self._gamma) | |
| self.camera_manager.transform_index = cam_pos_index | |
| self.camera_manager.set_sensor(cam_index) | |
| actor_type = get_actor_display_name(self.player) | |
| if self.sync: | |
| self.world.tick() | |
| else: | |
| self.world.wait_for_tick() | |
| def render(self, frame): | |
| if len(self.world.get_actors().filter(self.player_name)) < 1: | |
| return False | |
| self.camera_manager.render(frame, self.camera_save_dir) | |
| return True | |
| def destroy(self): | |
| sensors = [ | |
| self.camera_manager.sensor,] | |
| for sensor in sensors: | |
| if sensor is not None: | |
| sensor.stop() | |
| sensor.destroy() | |
| if self.player is not None: | |
| self.player.destroy() | |
| # ============================================================================== | |
| # -- CameraManager ------------------------------------------------------------- | |
| # ============================================================================== | |
| class CameraManager(object): | |
| def __init__(self, parent_actor, gamma_correction): | |
| self.sensor = None | |
| self.img = None | |
| self.data_queue = Queue() | |
| self._parent = parent_actor | |
| self.recording = False | |
| bound_x = 0.5 + self._parent.bounding_box.extent.x | |
| bound_y = 0.5 + self._parent.bounding_box.extent.y | |
| bound_z = 0.5 + self._parent.bounding_box.extent.z | |
| Attachment = carla.AttachmentType | |
| self.wide = 1920 | |
| self.height = 1080 | |
| if not self._parent.type_id.startswith("walker.pedestrian"): | |
| self._camera_transforms = [ | |
| (carla.Transform(carla.Location(x=-2.0*bound_x, y=+0.0*bound_y, z=2.0*bound_z), carla.Rotation(pitch=8.0)), Attachment.SpringArm), | |
| (carla.Transform(carla.Location(x=+0.8*bound_x, y=+0.0*bound_y, z=1.3*bound_z)), Attachment.Rigid), | |
| (carla.Transform(carla.Location(x=+1.9*bound_x, y=+1.0*bound_y, z=1.2*bound_z)), Attachment.SpringArm), | |
| (carla.Transform(carla.Location(x=-2.8*bound_x, y=+0.0*bound_y, z=4.6*bound_z), carla.Rotation(pitch=6.0)), Attachment.SpringArm), | |
| (carla.Transform(carla.Location(x=-1.0, y=-1.0*bound_y, z=0.4*bound_z)), Attachment.Rigid)] | |
| else: | |
| self._camera_transforms = [ | |
| (carla.Transform(carla.Location(x=-2.5, z=0.0), carla.Rotation(pitch=-8.0)), Attachment.SpringArm), | |
| (carla.Transform(carla.Location(x=1.6, z=1.7)), Attachment.Rigid), | |
| (carla.Transform(carla.Location(x=2.5, y=0.5, z=0.0), carla.Rotation(pitch=-8.0)), Attachment.SpringArm), | |
| (carla.Transform(carla.Location(x=-4.0, z=2.0), carla.Rotation(pitch=6.0)), Attachment.SpringArm), | |
| (carla.Transform(carla.Location(x=0, y=-2.5, z=-0.0), carla.Rotation(yaw=90.0)), Attachment.Rigid), | |
| (carla.Transform(carla.Location(x=10, y=0, z=1.7), carla.Rotation(pitch=-90.0)), Attachment.Rigid) # bev camera | |
| ] | |
| self.transform_index = 5 | |
| self.sensors = [ | |
| ['sensor.camera.rgb', cc.Raw, 'Camera RGB', {}], | |
| ['sensor.camera.depth', cc.Raw, 'Camera Depth (Raw)', {}], | |
| ['sensor.camera.depth', cc.Depth, 'Camera Depth (Gray Scale)', {}], | |
| ['sensor.camera.depth', cc.LogarithmicDepth, 'Camera Depth (Logarithmic Gray Scale)', {}], | |
| ['sensor.camera.semantic_segmentation', cc.Raw, 'Camera Semantic Segmentation (Raw)', {}], | |
| ['sensor.camera.semantic_segmentation', cc.CityScapesPalette, | |
| 'Camera Semantic Segmentation (CityScapes Palette)', {}], | |
| ['sensor.lidar.ray_cast', None, 'Lidar (Ray-Cast)', {'range': '50'}], | |
| ['sensor.camera.dvs', cc.Raw, 'Dynamic Vision Sensor', {}], | |
| ['sensor.camera.rgb', cc.Raw, 'Camera RGB Distorted', | |
| {'lens_circle_multiplier': '3.0', | |
| 'lens_circle_falloff': '3.0', | |
| 'chromatic_aberration_intensity': '0.5', | |
| 'chromatic_aberration_offset': '0'}], | |
| ['sensor.camera.optical_flow', cc.Raw, 'Optical Flow', {}], | |
| ] | |
| world = self._parent.get_world() | |
| bp_library = world.get_blueprint_library() | |
| for item in self.sensors: | |
| bp = bp_library.find(item[0]) | |
| if item[0].startswith('sensor.camera'): | |
| bp.set_attribute('image_size_x', str(self.wide)) | |
| bp.set_attribute('image_size_y', str(self.height)) | |
| if bp.has_attribute('gamma'): | |
| bp.set_attribute('gamma', str(gamma_correction)) | |
| for attr_name, attr_value in item[3].items(): | |
| bp.set_attribute(attr_name, attr_value) | |
| elif item[0].startswith('sensor.lidar'): | |
| self.lidar_range = 50 | |
| for attr_name, attr_value in item[3].items(): | |
| bp.set_attribute(attr_name, attr_value) | |
| if attr_name == 'range': | |
| self.lidar_range = float(attr_value) | |
| item.append(bp) | |
| self.index = None | |
| # def toggle_camera(self): | |
| # self.transform_index = (self.transform_index + 1) % len(self._camera_transforms) | |
| # self.set_sensor(self.index, force_respawn=True) | |
| def set_sensor(self, index, force_respawn=False): | |
| index = index % len(self.sensors) | |
| needs_respawn = True if self.index is None else \ | |
| (force_respawn or (self.sensors[index][2] != self.sensors[self.index][2])) | |
| if needs_respawn: | |
| if self.sensor is not None: | |
| self.sensor.destroy() | |
| self.surface = None | |
| self.sensor = self._parent.get_world().spawn_actor( | |
| self.sensors[index][-1], | |
| self._camera_transforms[self.transform_index][0], | |
| attach_to=self._parent, | |
| attachment_type=self._camera_transforms[self.transform_index][1]) | |
| # We need to pass the lambda a weak reference to self to avoid | |
| # circular reference. | |
| weak_self = weakref.ref(self) | |
| self.sensor.listen(lambda image: CameraManager._parse_image(weak_self, image)) | |
| self.index = index | |
| def next_sensor(self): | |
| self.set_sensor(self.index + 1) | |
| def await_data(self, frame, timeout=8): | |
| timeout = time.time() + timeout * 60 | |
| while time.time() < timeout: | |
| recv_frame, data = self.data_queue.get(timeout=8) | |
| if int(frame) == int(recv_frame): | |
| self.img = data | |
| return | |
| def render(self, frame, save_dir): | |
| self.await_data(frame) | |
| img = self.img | |
| cv2.imwrite(os.path.join(save_dir, str(frame).zfill(4) + '.png'), img) | |
| def _parse_image(weak_self, image): | |
| self = weak_self() | |
| if not self: | |
| return | |
| if self.sensors[self.index][0].startswith('sensor.lidar'): | |
| points = np.frombuffer(image.raw_data, dtype=np.dtype('f4')) | |
| points = np.reshape(points, (int(points.shape[0] / 4), 4)) | |
| lidar_data = np.array(points[:, :2]) | |
| elif self.sensors[self.index][0].startswith('sensor.camera.dvs'): | |
| # Example of converting the raw_data from a carla.DVSEventArray | |
| # sensor into a NumPy array and using it as an image | |
| dvs_events = np.frombuffer(image.raw_data, dtype=np.dtype([ | |
| ('x', np.uint16), ('y', np.uint16), ('t', np.int64), ('pol', np.bool)])) | |
| dvs_img = np.zeros((image.height, image.width, 3), dtype=np.uint8) | |
| # Blue is positive, red is negative | |
| dvs_img[dvs_events[:]['y'], dvs_events[:]['x'], dvs_events[:]['pol'] * 2] = 255 | |
| elif self.sensors[self.index][0].startswith('sensor.camera.optical_flow'): | |
| image = image.get_color_coded_flow() | |
| array = np.frombuffer(image.raw_data, dtype=np.dtype("uint8")) | |
| array = np.reshape(array, (image.height, image.width, 4)) | |
| array = array[:, :, :3] | |
| array = array[:, :, ::-1] | |
| else: # rgb' | |
| image.convert(self.sensors[self.index][1]) | |
| array = np.frombuffer(image.raw_data, dtype=np.dtype("uint8")) | |
| array = np.reshape(array, (image.height, image.width, 4))[:,:,:3] | |
| self.data_queue.put((image.frame, array)) | |
| # ============================================================================== | |
| # -- game_loop() --------------------------------------------------------------- | |
| # ============================================================================== | |
| def game_loop(args): | |
| world = None | |
| try: | |
| client = carla.Client(args.host, args.port) | |
| client.set_timeout(2.0) | |
| sim_world = client.get_world() | |
| world = WorldSR(sim_world, args) | |
| world.player.set_autopilot(True) | |
| world.player.set_light_state(carla.VehicleLightState.NONE) | |
| for i in range(20 * 30): | |
| if args.sync: | |
| frame = sim_world.tick() | |
| else: | |
| frame = sim_world.wait_for_tick().frame | |
| world.render(frame) | |
| except BaseException as e: | |
| print(e) | |
| finally: | |
| if world is not None: | |
| # prevent destruction of ego vehicle | |
| if args.keep_ego_vehicle: | |
| world.player = None | |
| world.destroy() | |
| # ============================================================================== | |
| # -- main() -------------------------------------------------------------------- | |
| # ============================================================================== | |
| def main(): | |
| argparser = argparse.ArgumentParser( | |
| description='CARLA Manual Control Client') | |
| argparser.add_argument( | |
| '-v', '--verbose', | |
| action='store_true', | |
| dest='debug', | |
| help='print debug information') | |
| argparser.add_argument( | |
| '--host', | |
| metavar='H', | |
| default='127.0.0.1', | |
| help='IP of the host server (default: 127.0.0.1)') | |
| argparser.add_argument( | |
| '-p', '--port', | |
| metavar='P', | |
| default=2000, | |
| type=int, | |
| help='TCP port to listen to (default: 2000)') | |
| argparser.add_argument( | |
| '-a', '--autopilot', | |
| action='store_true', | |
| help='enable autopilot') | |
| argparser.add_argument( | |
| '--generation', | |
| metavar='G', | |
| default='2', | |
| help='restrict to certain actor generation (values: "1","2","All" - default: "2")') | |
| argparser.add_argument( | |
| '--gamma', | |
| default=2.2, | |
| type=float, | |
| help='Gamma correction of the camera (default: 2.2)') | |
| argparser.add_argument( | |
| '-s', '--seed', | |
| help='Set seed for repeating executions (default: None)', | |
| default=None, | |
| type=int) | |
| argparser.add_argument( | |
| '--sync', | |
| action='store_true', | |
| help='Activate synchronous mode execution') | |
| argparser.add_argument( | |
| '--rolename', | |
| metavar='NAME', | |
| default='hero', | |
| help='role name of ego vehicle to control (default: "hero")') | |
| argparser.add_argument( | |
| '--keep_ego_vehicle', | |
| action='store_true', | |
| help='do not destroy ego vehicle on exit') | |
| argparser.add_argument( | |
| '--save_dir', | |
| default='./default_save_dir/') | |
| argparser.add_argument( | |
| '--res', | |
| metavar='WIDTHxHEIGHT', | |
| default='1280x720', | |
| help='window resolution (default: 1280x720)') | |
| args = argparser.parse_args() | |
| args.filter = "vehicle.*" # Needed for CARLA version | |
| args.width, args.height = [int(x) for x in args.res.split('x')] | |
| log_level = logging.DEBUG if args.debug else logging.INFO | |
| logging.basicConfig(format='%(levelname)s: %(message)s', level=log_level) | |
| logging.info('listening to server %s:%s', args.host, args.port) | |
| if os.path.exists(args.save_dir): | |
| shutil.rmtree(args.save_dir) | |
| else: | |
| os.makedirs(args.save_dir) | |
| print('creating', args.save_dir) | |
| try: | |
| game_loop(args) | |
| except Exception as error: | |
| logging.exception(error) | |
| if __name__ == '__main__': | |
| main() | |