Spaces:
Runtime error
Runtime error
| import time | |
| import ray | |
| from ray.util.queue import Queue, Empty | |
| from ray.actor import ActorHandle | |
| import numpy as np | |
| import pid_helper | |
| # Ray Queue's take ~.5 seconds to splin up; | |
| # this class creates a pool of queues to cycle through | |
| class QueueFactory: | |
| def __init__(self, max_size:int): | |
| self.queues:[Queue] = [] | |
| self.queue_size = 5 | |
| self.max_size = max_size | |
| while len(self.queues) < self.queue_size: | |
| self.queues.append(Queue(maxsize=max_size)) | |
| def get_queue(self)->Queue: | |
| queue = self.queues.pop(0) | |
| self.queues.append(Queue(maxsize=self.max_size)) | |
| return queue | |
| class AppInterfaceActor: | |
| def __init__(self): | |
| self.audio_input_queue = Queue(maxsize=3000) # Adjust the size as needed | |
| self.video_input_queue = Queue(maxsize=10) # Adjust the size as needed | |
| self.audio_output_queue_factory = QueueFactory(max_size=50) | |
| self.audio_output_queue = self.audio_output_queue_factory.get_queue() | |
| self.video_output_queue = Queue(maxsize=10) # Adjust the size as needed | |
| self.debug_str = "" | |
| self.state = "Initializing" | |
| self.charles_app_pids = [] | |
| def get_singleton(): | |
| return AppInterfaceActor.options( | |
| name="AppInterfaceActor", | |
| get_if_exists=True, | |
| ).remote() | |
| # functions for UI to enqueue input, dequeue output | |
| async def enqueue_video_input_frame(self, shared_tensor_ref): | |
| if self.video_input_queue.full(): | |
| evicted_item = await self.video_input_queue.get_async() | |
| del evicted_item | |
| await self.video_input_queue.put_async(shared_tensor_ref) | |
| async def enqueue_audio_input_frame(self, shared_buffer_ref): | |
| if self.audio_input_queue.full(): | |
| evicted_item = await self.audio_input_queue.get_async() | |
| del evicted_item | |
| await self.audio_input_queue.put_async(shared_buffer_ref) | |
| async def dequeue_audio_output_frame_async(self): | |
| start_time = time.time() | |
| try: | |
| frame = await self.audio_output_queue.get_async(block=False) | |
| except Empty: | |
| frame = None | |
| elapsed_time = time.time() - start_time | |
| if elapsed_time > 0.1: | |
| print (f"dequeue_audio_output_frame_async time: {elapsed_time}. was empty: {frame is None}. frame type: {type(frame) if frame else str(0)}") | |
| return frame | |
| async def dequeue_video_output_frames_async(self): | |
| video_frames = [] | |
| if self.video_output_queue.empty(): | |
| return video_frames | |
| while not self.video_output_queue.empty(): | |
| shared_tensor = await self.video_output_queue.get_async() | |
| video_frames.append(shared_tensor) | |
| return video_frames | |
| # functions for application to dequeue input, enqueue output | |
| def get_audio_output_queue(self)->Queue: | |
| return self.audio_output_queue | |
| async def cycle_output_queue(self)->Queue: | |
| self.audio_output_queue.shutdown(grace_period_s=0.1) | |
| self.audio_output_queue = self.audio_output_queue_factory.get_queue() | |
| return self.audio_output_queue | |
| async def enqueue_video_output_frame(self, shared_tensor_ref): | |
| if self.video_output_queue.full(): | |
| evicted_item = await self.video_output_queue.get_async() | |
| del evicted_item | |
| await self.video_output_queue.put_async(shared_tensor_ref) | |
| async def dequeue_audio_input_frames_async(self): | |
| audio_frames = [] | |
| if self.audio_input_queue.empty(): | |
| return audio_frames | |
| while not self.audio_input_queue.empty(): | |
| shared_tensor = await self.audio_input_queue.get_async() | |
| audio_frames.append(shared_tensor) | |
| return audio_frames | |
| async def dequeue_video_input_frames_async(self): | |
| video_frames = [] | |
| if self.video_input_queue.empty(): | |
| return video_frames | |
| while not self.video_input_queue.empty(): | |
| shared_tensor = await self.video_input_queue.get_async() | |
| video_frames.append(shared_tensor) | |
| return video_frames | |
| # pid helpers | |
| async def add_charles_app_pid(self, pid:int): | |
| self.charles_app_pids.append(pid) | |
| async def get_charles_app_pids(self)->[int]: | |
| # prune dead pids | |
| running_charles_app_pids = [] | |
| for pid in self.charles_app_pids: | |
| if pid_helper.is_pid_running(pid): | |
| running_charles_app_pids.append(pid) | |
| self.charles_app_pids = running_charles_app_pids | |
| return self.charles_app_pids | |
| async def is_charles_app_running(self)->bool: | |
| # prune dead pids | |
| running_charles_app_pids = [] | |
| for pid in self.charles_app_pids: | |
| if pid_helper.is_pid_running(pid): | |
| running_charles_app_pids.append(pid) | |
| self.charles_app_pids = running_charles_app_pids | |
| return len(self.charles_app_pids) > 0 | |
| # debug helpers | |
| async def get_debug_output(self)->str: | |
| return self.debug_str | |
| async def set_debug_output(self, debug_str:str): | |
| self.debug_str = debug_str | |
| async def get_state(self)->str: | |
| return self.state | |
| async def set_state(self, state:str): | |
| self.state = state |