| from typing import Optional, Callable, Dict |
| import os |
| import enum |
| import time |
| import json |
| import numpy as np |
| import pyrealsense2 as rs |
| import multiprocess as mp |
| import cv2 |
| from threadpoolctl import threadpool_limits |
| from multiprocess.managers import SharedMemoryManager |
|
|
| from .utils import get_accumulate_timestamp_idxs |
| from .shared_memory.shared_ndarray import SharedNDArray |
| from .shared_memory.shared_memory_ring_buffer import SharedMemoryRingBuffer |
| from .shared_memory.shared_memory_queue import SharedMemoryQueue, Full, Empty |
|
|
| class Command(enum.Enum): |
| SET_COLOR_OPTION = 0 |
| SET_DEPTH_OPTION = 1 |
| START_RECORDING = 2 |
| STOP_RECORDING = 3 |
| RESTART_PUT = 4 |
|
|
| class SingleRealsense(mp.Process): |
| MAX_PATH_LENGTH = 4096 |
|
|
| def __init__( |
| self, |
| shm_manager: SharedMemoryManager, |
| serial_number, |
| resolution=(1280,720), |
| capture_fps=30, |
| put_fps=None, |
| put_downsample=True, |
| enable_color=True, |
| enable_depth=False, |
| process_depth=False, |
| enable_infrared=False, |
| get_max_k=30, |
| advanced_mode_config=None, |
| transform: Optional[Callable[[Dict], Dict]] = None, |
| vis_transform: Optional[Callable[[Dict], Dict]] = None, |
| is_master=False, |
| verbose=False |
| ): |
| super().__init__() |
|
|
| if put_fps is None: |
| put_fps = capture_fps |
|
|
| |
| resolution = tuple(resolution) |
| shape = resolution[::-1] |
| examples = dict() |
| if enable_color: |
| examples['color'] = np.empty( |
| shape=shape+(3,), dtype=np.uint8) |
| if enable_depth: |
| examples['depth'] = np.empty( |
| shape=shape, dtype=np.uint16) |
| if enable_infrared: |
| examples['infrared'] = np.empty( |
| shape=shape, dtype=np.uint8) |
| examples['camera_capture_timestamp'] = 0.0 |
| examples['camera_receive_timestamp'] = 0.0 |
| examples['timestamp'] = 0.0 |
| examples['step_idx'] = 0 |
|
|
| ring_buffer = SharedMemoryRingBuffer.create_from_examples( |
| shm_manager=shm_manager, |
| examples=examples if transform is None |
| else transform(dict(examples)), |
| get_max_k=get_max_k, |
| get_time_budget=0.2, |
| put_desired_frequency=put_fps |
| ) |
|
|
| |
| examples = { |
| 'cmd': Command.SET_COLOR_OPTION.value, |
| 'option_enum': rs.option.exposure.value, |
| 'option_value': 0.0, |
| 'put_start_time': 0.0 |
| } |
|
|
| command_queue = SharedMemoryQueue.create_from_examples( |
| shm_manager=shm_manager, |
| examples=examples, |
| buffer_size=128 |
| ) |
|
|
| |
| intrinsics_array = SharedNDArray.create_from_shape( |
| mem_mgr=shm_manager, |
| shape=(7,), |
| dtype=np.float64) |
| intrinsics_array.get()[:] = 0 |
|
|
| |
| self.serial_number = serial_number |
| self.resolution = resolution |
| self.capture_fps = capture_fps |
| self.put_fps = put_fps |
| self.put_downsample = put_downsample |
| self.enable_color = enable_color |
| self.enable_depth = enable_depth |
| self.enable_infrared = enable_infrared |
| self.advanced_mode_config = advanced_mode_config |
| self.transform = transform |
| self.vis_transform = vis_transform |
| self.process_depth = process_depth |
| self.is_master = is_master |
| self.verbose = verbose |
| self.put_start_time = None |
|
|
| |
| self.stop_event = mp.Event() |
| self.ready_event = mp.Event() |
| self.ring_buffer = ring_buffer |
| self.command_queue = command_queue |
| self.intrinsics_array = intrinsics_array |
| |
| @staticmethod |
| def get_connected_devices_serial(): |
| serials = list() |
| for d in rs.context().devices: |
| if d.get_info(rs.camera_info.name).lower() != 'platform camera': |
| serial = d.get_info(rs.camera_info.serial_number) |
| product_line = d.get_info(rs.camera_info.product_line) |
| if product_line == 'D400': |
| |
| serials.append(serial) |
| serials = sorted(serials) |
| return serials |
|
|
| |
| def __enter__(self): |
| self.start() |
| return self |
| |
| def __exit__(self, exc_type, exc_val, exc_tb): |
| self.stop() |
|
|
| |
| def start(self, wait=True, put_start_time=None): |
| self.put_start_time = put_start_time |
| super().start() |
| if wait: |
| self.start_wait() |
| |
| def stop(self, wait=True): |
| self.stop_event.set() |
| if wait: |
| self.end_wait() |
|
|
| def start_wait(self): |
| self.ready_event.wait() |
| |
| def end_wait(self): |
| self.join() |
|
|
| @property |
| def is_ready(self): |
| return self.ready_event.is_set() |
|
|
| def get(self, k=None, out=None): |
| if k is None: |
| return self.ring_buffer.get(out=out) |
| else: |
| return self.ring_buffer.get_last_k(k, out=out) |
| |
| |
| def set_color_option(self, option: rs.option, value: float): |
| self.command_queue.put({ |
| 'cmd': Command.SET_COLOR_OPTION.value, |
| 'option_enum': option.value, |
| 'option_value': value |
| }) |
| |
| def set_exposure(self, exposure=None, gain=None): |
| """ |
| exposure: (1, 10000) 100us unit. (0.1 ms, 1/10000s) |
| gain: (0, 128) |
| """ |
|
|
| if exposure is None and gain is None: |
| |
| self.set_color_option(rs.option.enable_auto_exposure, 1.0) |
| else: |
| |
| self.set_color_option(rs.option.enable_auto_exposure, 0.0) |
| if exposure is not None: |
| self.set_color_option(rs.option.exposure, exposure) |
| if gain is not None: |
| self.set_color_option(rs.option.gain, gain) |
| |
| def set_white_balance(self, white_balance=None): |
| if white_balance is None: |
| self.set_color_option(rs.option.enable_auto_white_balance, 1.0) |
| else: |
| self.set_color_option(rs.option.enable_auto_white_balance, 0.0) |
| self.set_color_option(rs.option.white_balance, white_balance) |
|
|
| def get_intrinsics(self): |
| assert self.ready_event.is_set() |
| fx, fy, ppx, ppy = self.intrinsics_array.get()[:4] |
| mat = np.eye(3) |
| mat[0,0] = fx |
| mat[1,1] = fy |
| mat[0,2] = ppx |
| mat[1,2] = ppy |
| return mat |
|
|
| def get_depth_scale(self): |
| assert self.ready_event.is_set() |
| scale = self.intrinsics_array.get()[-1] |
| return scale |
| |
| def depth_process(self, depth_frame): |
| depth_to_disparity = rs.disparity_transform(True) |
| disparity_to_depth = rs.disparity_transform(False) |
| |
| spatial = rs.spatial_filter() |
| spatial.set_option(rs.option.filter_magnitude, 5) |
| spatial.set_option(rs.option.filter_smooth_alpha, 0.75) |
| spatial.set_option(rs.option.filter_smooth_delta, 1) |
| spatial.set_option(rs.option.holes_fill, 1) |
| |
| temporal = rs.temporal_filter() |
| temporal.set_option(rs.option.filter_smooth_alpha, 0.75) |
| temporal.set_option(rs.option.filter_smooth_delta, 1) |
|
|
| filtered_depth = depth_to_disparity.process(depth_frame) |
| filtered_depth = spatial.process(filtered_depth) |
| filtered_depth = temporal.process(filtered_depth) |
| filtered_depth = disparity_to_depth.process(filtered_depth) |
| return filtered_depth |
|
|
| def restart_put(self, start_time): |
| self.command_queue.put({ |
| 'cmd': Command.RESTART_PUT.value, |
| 'put_start_time': start_time |
| }) |
| |
| |
| def run(self): |
| |
| threadpool_limits(1) |
| cv2.setNumThreads(1) |
| w, h = self.resolution |
| fps = self.capture_fps |
| align = rs.align(rs.stream.color) |
| |
| rs_config = rs.config() |
| if self.enable_color: |
| rs_config.enable_stream(rs.stream.color, |
| w, h, rs.format.bgr8, fps) |
| if self.enable_depth: |
| rs_config.enable_stream(rs.stream.depth, |
| w, h, rs.format.z16, fps) |
| if self.enable_infrared: |
| rs_config.enable_stream(rs.stream.infrared, |
| w, h, rs.format.y8, fps) |
| |
| def init_device(): |
| rs_config.enable_device(self.serial_number) |
|
|
| |
| pipeline = rs.pipeline() |
| pipeline_profile = pipeline.start(rs_config) |
| self.pipeline = pipeline |
| self.pipeline_profile = pipeline_profile |
|
|
| |
| |
| d = self.pipeline_profile.get_device().first_color_sensor() |
| d.set_option(rs.option.global_time_enabled, 1) |
|
|
| |
| if self.advanced_mode_config is not None: |
| json_text = json.dumps(self.advanced_mode_config) |
| device = self.pipeline_profile.get_device() |
| advanced_mode = rs.rs400_advanced_mode(device) |
| advanced_mode.load_json(json_text) |
|
|
| |
| color_stream = self.pipeline_profile.get_stream(rs.stream.color) |
| intr = color_stream.as_video_stream_profile().get_intrinsics() |
| order = ['fx', 'fy', 'ppx', 'ppy', 'height', 'width'] |
| for i, name in enumerate(order): |
| self.intrinsics_array.get()[i] = getattr(intr, name) |
|
|
| if self.enable_depth: |
| depth_sensor = self.pipeline_profile.get_device().first_depth_sensor() |
| depth_scale = depth_sensor.get_depth_scale() |
| self.intrinsics_array.get()[-1] = depth_scale |
| |
| |
| if self.verbose: |
| print(f'[SingleRealsense {self.serial_number}] Main loop started.') |
|
|
| try: |
| init_device() |
| |
| put_idx = None |
| put_start_time = self.put_start_time |
| if put_start_time is None: |
| put_start_time = time.time() |
|
|
| iter_idx = 0 |
| t_start = time.time() |
| while not self.stop_event.is_set(): |
| |
| frameset = None |
| while frameset is None: |
| try: |
| frameset = self.pipeline.wait_for_frames() |
| except RuntimeError as e: |
| print(f'[SingleRealsense {self.serial_number}] Error: {e}. Ready state: {self.ready_event.is_set()}, Restarting device.') |
| device = self.pipeline.get_active_profile().get_device() |
| device.hardware_reset() |
| self.pipeline.stop() |
| init_device() |
| continue |
| receive_time = time.time() |
| |
| frameset = align.process(frameset) |
|
|
| self.ring_buffer.ready_for_get = (receive_time - put_start_time >= 0) |
|
|
| |
| if self.verbose: |
| grad_start_time = time.time() |
| data = dict() |
| data['camera_receive_timestamp'] = receive_time |
| |
| data['camera_capture_timestamp'] = frameset.get_timestamp() / 1000 |
| if self.enable_color: |
| |
| color_frame = frameset.get_color_frame() |
| data['color'] = np.asarray(color_frame.get_data()) |
| t = color_frame.get_timestamp() / 1000 |
| data['camera_capture_timestamp'] = t |
| |
| |
| if self.enable_depth: |
| depth_frame = frameset.get_depth_frame() |
| if self.process_depth: |
| data['depth'] = self.depth_process(depth_frame) |
| data['depth'] = np.asarray(depth_frame.get_data()) |
| |
| |
| if self.enable_infrared: |
| data['infrared'] = np.asarray( |
| frameset.get_infrared_frame().get_data()) |
| if self.verbose: |
| print(f'[SingleRealsense {self.serial_number}] Grab data time {time.time() - grad_start_time}') |
| |
| |
| if self.verbose: |
| transform_start_time = time.time() |
| put_data = data |
| if self.transform is not None: |
| put_data = self.transform(dict(data)) |
| if self.verbose: |
| print(f'[SingleRealsense {self.serial_number}] Transform time {time.time() - transform_start_time}') |
|
|
| if self.verbose: |
| put_data_start_time = time.time() |
| if self.put_downsample: |
| |
| |
| local_idxs, global_idxs, put_idx \ |
| = get_accumulate_timestamp_idxs( |
| timestamps=[receive_time], |
| start_time=put_start_time, |
| dt=1/self.put_fps, |
| |
| |
| next_global_idx=put_idx, |
| |
| |
| allow_negative=True |
| ) |
| for step_idx in global_idxs: |
| put_data['step_idx'] = step_idx |
| |
| put_data['timestamp'] = receive_time |
| |
| self.ring_buffer.put(put_data, wait=False, serial_number=self.serial_number) |
| else: |
| step_idx = int((receive_time - put_start_time) * self.put_fps) |
| print(step_idx, receive_time) |
| put_data['step_idx'] = step_idx |
| put_data['timestamp'] = receive_time |
| self.ring_buffer.put(put_data, wait=False, serial_number=self.serial_number) |
| if self.verbose: |
| print(f'[SingleRealsense {self.serial_number}] Put data time {time.time() - put_data_start_time}', end=' ') |
| print(f'with downsample for {len(global_idxs)}x' if self.put_downsample and len(global_idxs) > 1 else '') |
|
|
| |
| if iter_idx == 0: |
| self.ready_event.set() |
|
|
| |
| t_end = time.time() |
| duration = t_end - t_start |
| frequency = np.round(1 / duration, 1) |
| t_start = t_end |
| if self.verbose: |
| print(f'[SingleRealsense {self.serial_number}] FPS {frequency}') |
|
|
| |
| try: |
| commands = self.command_queue.get_all() |
| n_cmd = len(commands['cmd']) |
| except Empty: |
| n_cmd = 0 |
|
|
| |
| for i in range(n_cmd): |
| command = dict() |
| for key, value in commands.items(): |
| command[key] = value[i] |
| cmd = command['cmd'] |
| if cmd == Command.SET_COLOR_OPTION.value: |
| sensor = self.pipeline_profile.get_device().first_color_sensor() |
| option = rs.option(command['option_enum']) |
| value = float(command['option_value']) |
| sensor.set_option(option, value) |
| |
| |
| |
| elif cmd == Command.SET_DEPTH_OPTION.value: |
| sensor = self.pipeline_profile.get_device().first_depth_sensor().set_option(rs.option.inter_cam_sync_mode, 1 if self.is_master else 2) |
| option = rs.option(command['option_enum']) |
| value = float(command['option_value']) |
| sensor.set_option(option, value) |
| elif cmd == Command.RESTART_PUT.value: |
| put_idx = None |
| put_start_time = command['put_start_time'] |
|
|
| iter_idx += 1 |
| finally: |
| rs_config.disable_all_streams() |
| self.ready_event.set() |
| |
| if self.verbose: |
| print(f'[SingleRealsense {self.serial_number}] Exiting worker process.') |
|
|