| import multiprocessing as mp |
| from multiprocessing import shared_memory |
| import time |
| from typing import Any, Dict |
|
|
| import numpy as np |
|
|
| from .sensor_utils import ImageMessageSchema, ImageUtils, SensorServer |
|
|
|
|
| def get_multiprocessing_info(verbose: bool = True): |
| """Get information about multiprocessing start methods""" |
|
|
| if verbose: |
| print(f"Available start methods: {mp.get_all_start_methods()}") |
| return mp.get_start_method() |
|
|
|
|
| class ImagePublishProcess: |
| """Subprocess for publishing images using shared memory and ZMQ""" |
|
|
| def __init__( |
| self, |
| camera_configs: Dict[str, Any], |
| image_dt: float, |
| zmq_port: int = 5555, |
| start_method: str = "spawn", |
| verbose: bool = False, |
| ): |
| self.camera_configs = camera_configs |
| self.image_dt = image_dt |
| self.zmq_port = zmq_port |
| self.verbose = verbose |
| self.shared_memory_blocks = {} |
| self.shared_memory_info = {} |
| self.process = None |
|
|
| |
| self.mp_context = mp.get_context(start_method) |
| if self.verbose: |
| print(f"Using multiprocessing context: {start_method}") |
|
|
| self.stop_event = self.mp_context.Event() |
| self.data_ready_event = self.mp_context.Event() |
|
|
| |
| self.stop_event.clear() |
| self.data_ready_event.clear() |
|
|
| if self.verbose: |
| print(f"Initial stop_event state: {self.stop_event.is_set()}") |
| print(f"Initial data_ready_event state: {self.data_ready_event.is_set()}") |
|
|
| |
| for camera_name, camera_config in camera_configs.items(): |
| height = camera_config["height"] |
| width = camera_config["width"] |
| |
| size = height * width * 3 |
|
|
| |
| shm = shared_memory.SharedMemory(create=True, size=size) |
| self.shared_memory_blocks[camera_name] = shm |
| self.shared_memory_info[camera_name] = { |
| "name": shm.name, |
| "size": size, |
| "shape": (height, width, 3), |
| "dtype": np.uint8, |
| } |
|
|
| def start_process(self): |
| """Start the image publishing subprocess""" |
| if self.verbose: |
| print(f"Starting subprocess with stop_event state: {self.stop_event.is_set()}") |
| self.process = self.mp_context.Process( |
| target=self._image_publish_worker, |
| args=( |
| self.shared_memory_info, |
| self.image_dt, |
| self.zmq_port, |
| self.stop_event, |
| self.data_ready_event, |
| self.verbose, |
| ), |
| ) |
| self.process.start() |
| if self.verbose: |
| print(f"Subprocess started, PID: {self.process.pid}") |
|
|
| def update_shared_memory(self, render_caches: Dict[str, np.ndarray]): |
| """Update shared memory with new rendered images""" |
| images_updated = 0 |
| for camera_name in self.camera_configs.keys(): |
| image_key = f"{camera_name}_image" |
| if image_key in render_caches: |
| image = render_caches[image_key] |
|
|
| |
| if image.dtype != np.uint8: |
| image = (image * 255).astype(np.uint8) |
|
|
| |
| shm = self.shared_memory_blocks[camera_name] |
| shared_array = np.ndarray( |
| self.shared_memory_info[camera_name]["shape"], |
| dtype=self.shared_memory_info[camera_name]["dtype"], |
| buffer=shm.buf, |
| ) |
|
|
| |
| np.copyto(shared_array, image) |
| images_updated += 1 |
|
|
| |
| if images_updated > 0: |
| if self.verbose: |
| print(f"Main process: Updated {images_updated} images, setting data_ready_event") |
| self.data_ready_event.set() |
| elif self.verbose: |
| print( |
| "Main process: No images to update. " |
| "please check if camera configs are provided and the renderer is properly initialized" |
| ) |
|
|
| def stop(self): |
| """Stop the image publishing subprocess""" |
| if self.verbose: |
| print("Stopping image publishing subprocess...") |
| self.stop_event.set() |
|
|
| if self.process and self.process.is_alive(): |
| |
| self.process.join(timeout=5) |
| if self.process.is_alive(): |
| if self.verbose: |
| print("Subprocess didn't stop gracefully, terminating...") |
| self.process.terminate() |
| self.process.join(timeout=2) |
| if self.process.is_alive(): |
| if self.verbose: |
| print("Force killing subprocess...") |
| self.process.kill() |
| self.process.join() |
|
|
| |
| for camera_name, shm in self.shared_memory_blocks.items(): |
| try: |
| shm.close() |
| shm.unlink() |
| if self.verbose: |
| print(f"Cleaned up shared memory for {camera_name}") |
| except Exception as e: |
| if self.verbose: |
| print(f"Warning: Failed to cleanup shared memory for {camera_name}: {e}") |
|
|
| self.shared_memory_blocks.clear() |
| if self.verbose: |
| print("Image publishing subprocess stopped and cleaned up") |
|
|
| @staticmethod |
| def _image_publish_worker( |
| shared_memory_info, image_dt, zmq_port, stop_event, data_ready_event, verbose |
| ): |
| """Worker function that runs in the subprocess""" |
| |
| from .sensor_utils import ImageMessageSchema, ImageUtils, SensorServer |
| |
| if verbose: |
| print(f"Worker started! PID: {__import__('os').getpid()}") |
| print(f"Worker stop_event state at start: {stop_event.is_set()}") |
| print(f"Worker data_ready_event state at start: {data_ready_event.is_set()}") |
|
|
| try: |
| |
| sensor_server = SensorServer() |
| sensor_server.start_server(port=zmq_port) |
|
|
| |
| shared_arrays = {} |
| shm_blocks = {} |
| for camera_name, info in shared_memory_info.items(): |
| shm = shared_memory.SharedMemory(name=info["name"]) |
| shm_blocks[camera_name] = shm |
| shared_arrays[camera_name] = np.ndarray( |
| info["shape"], dtype=info["dtype"], buffer=shm.buf |
| ) |
|
|
| print( |
| f"Image publishing subprocess started with {len(shared_arrays)} cameras on ZMQ port {zmq_port}" |
| ) |
|
|
| loop_count = 0 |
| last_data_time = time.time() |
|
|
| while not stop_event.is_set(): |
| loop_count += 1 |
|
|
| |
| timeout = min(image_dt, 0.1) |
| data_available = data_ready_event.wait(timeout=timeout) |
|
|
| current_time = time.time() |
|
|
| if data_available: |
| data_ready_event.clear() |
| if loop_count % 50 == 0: |
| print("Image publish frequency: ", 1 / (current_time - last_data_time)) |
| last_data_time = current_time |
|
|
| |
| try: |
| |
| image_copies = {name: arr.copy() for name, arr in shared_arrays.items()} |
|
|
| |
| message_dict = { |
| "images": image_copies, |
| "timestamps": {name: current_time for name in image_copies.keys()}, |
| } |
|
|
| |
| image_msg = ImageMessageSchema( |
| timestamps=message_dict.get("timestamps"), |
| images=message_dict.get("images", None), |
| ) |
|
|
| |
| serialized_data = image_msg.serialize() |
|
|
| |
| for camera_name, image_copy in image_copies.items(): |
| serialized_data[f"{camera_name}"] = ImageUtils.encode_image(image_copy) |
|
|
| sensor_server.send_message(serialized_data) |
|
|
| except Exception as e: |
| print(f"Error publishing images: {e}") |
|
|
| elif verbose and loop_count % 10 == 0: |
| print(f"Subprocess: Still waiting for data... (iteration {loop_count})") |
|
|
| |
| if not data_available: |
| time.sleep(0.001) |
|
|
| except KeyboardInterrupt: |
| print("Image publisher interrupted by user") |
| finally: |
| |
| try: |
| for shm in shm_blocks.values(): |
| shm.close() |
| sensor_server.stop_server() |
| except Exception as e: |
| print(f"Error during subprocess cleanup: {e}") |
| if verbose: |
| print("Image publish subprocess stopped") |
|
|