Spaces:
Runtime error
Runtime error
| import ray | |
| import time | |
| import asyncio | |
| import os | |
| from clip_transform import CLIPTransform | |
| class CharlesActor: | |
| def __init__(self): | |
| self._needs_init = True | |
| self._system_one_audio_history_output = "" | |
| self._state = "Initializing" | |
| self._clip_transform = CLIPTransform() | |
| def get_state(self): | |
| return self._state | |
| def get_system_one_audio_history_output(self): | |
| return self._system_one_audio_history_output | |
| async def _initalize_resources(self): | |
| # Initialize resources | |
| print("000 - create StreamlitAVQueue") | |
| from streamlit_av_queue import StreamlitAVQueue | |
| self._streamlit_av_queue = StreamlitAVQueue() | |
| self._out_audio_queue = self._streamlit_av_queue.get_out_audio_queue() | |
| print("001 - create RespondToPromptActor") | |
| from respond_to_prompt_actor import RespondToPromptActor | |
| self._respond_to_prompt_actor = RespondToPromptActor.remote(self._out_audio_queue) | |
| print("002 - create SpeechToTextVoskActor") | |
| from speech_to_text_vosk_actor import SpeechToTextVoskActor | |
| self._speech_to_text_actor = SpeechToTextVoskActor.remote("small") | |
| # self._speech_to_text_actor = SpeechToTextVoskActor.remote("big") | |
| self._debug_queue = [ | |
| # "hello, how are you today?", | |
| # "hmm, interesting, tell me more about that.", | |
| ] | |
| print("003 - create Prototypes") | |
| from prototypes import Prototypes | |
| self._prototypes = Prototypes() | |
| print("010") | |
| self._needs_init = True | |
| self._state = "Initialized" | |
| async def start(self): | |
| if self._needs_init: | |
| await self._initalize_resources() | |
| system_one_audio_history = [] | |
| self._state = "Waiting for input" | |
| total_video_frames = 0 | |
| skipped_video_frames = 0 | |
| total_audio_frames = 0 | |
| loops = 0 | |
| start_time = time.time() | |
| vector_debug = "--n/a--" | |
| process_speech_to_text_future = [] | |
| while True: | |
| if len(self._debug_queue) > 0: | |
| prompt = self._debug_queue.pop(0) | |
| await self._respond_to_prompt_actor.enqueue_prompt.remote(prompt) | |
| audio_frames = await self._streamlit_av_queue.get_in_audio_frames_async() | |
| if len(audio_frames) > 0: | |
| total_audio_frames += len(audio_frames) | |
| # Concatenate all audio frames into a single buffer | |
| audio_buffer = b"".join([buffer.tobytes() for buffer in audio_frames]) | |
| future = self._speech_to_text_actor.process_speech.remote(audio_buffer) | |
| process_speech_to_text_future.append(future) | |
| # audio_frames_task = None | |
| if len(process_speech_to_text_future) > 0: | |
| ready, _ = ray.wait([process_speech_to_text_future[0]], timeout=0) | |
| if ready: | |
| prompt, speaker_finished, raw_json = await process_speech_to_text_future[0] | |
| del process_speech_to_text_future[0] | |
| prompts_to_ignore = ["um", "uh", "ah", "huh", "hmm", "the", "but", "by", "just", "i'm"] | |
| if speaker_finished and len(prompt) > 0 and prompt not in prompts_to_ignore: | |
| print(f"Prompt: {prompt}") | |
| # system_one_audio_history.append("... " + str(raw_json)) | |
| system_one_audio_history.append(prompt) | |
| while len(system_one_audio_history) > 10: | |
| system_one_audio_history = system_one_audio_history[-10:] | |
| table_content = "| System 1 Audio History |\n| --- |\n" | |
| table_content += "\n".join([f"| {item} |" for item in reversed(system_one_audio_history)]) | |
| self._system_one_audio_history_output = table_content | |
| await self._respond_to_prompt_actor.enqueue_prompt.remote(prompt) | |
| # else: | |
| # print(f"not ready... " + str(raw_json)) | |
| video_frames = await self._streamlit_av_queue.get_video_frames_async() | |
| if len(video_frames) > 0: | |
| vector_debug = f"found {len(video_frames)} video frames" | |
| total_video_frames += 1 | |
| skipped_video_frames += (len(video_frames) -1) | |
| image_as_array = video_frames[-1] | |
| image_vector = self._clip_transform.image_to_embeddings(image_as_array) | |
| image_vector = image_vector[0] | |
| distances, closest_item_key, distance_debug_str = self._prototypes.get_distances(image_vector) | |
| vector_debug = f"{closest_item_key} {distance_debug_str}" | |
| await asyncio.sleep(0.01) | |
| loops+=1 | |
| self._state = f"Processed {total_video_frames} video frames and {total_audio_frames} audio frames, loops: {loops}. loops per second: {loops/(time.time()-start_time):.2f}. {vector_debug}" | |
| if __name__ == "__main__": | |
| if not ray.is_initialized(): | |
| # Try to connect to a running Ray cluster | |
| ray_address = os.getenv('RAY_ADDRESS') | |
| if ray_address: | |
| ray.init(ray_address, namespace="project_charles") | |
| else: | |
| ray.init(namespace="project_charles") | |
| charles_actor = CharlesActor.options( | |
| name="CharlesActor", | |
| get_if_exists=True, | |
| ).remote() | |
| future = charles_actor.start.remote() | |
| try: | |
| while True: | |
| ready, _ = ray.wait([future], timeout=0) | |
| if ready: | |
| # The start method has terminated. You can fetch the result (if any) with ray.get(). | |
| # If the method raised an exception, it will be re-raised here. | |
| try: | |
| result = ray.get(future) | |
| print(f"The start method has terminated with result: {result}") | |
| except Exception as e: | |
| print(f"The start method raised an exception: {e}") | |
| break | |
| else: | |
| # The start method is still running. You can poll for debug information here. | |
| time.sleep(1) | |
| state = charles_actor.get_state.remote() | |
| print(f"Charles is in state: {ray.get(state)}") | |
| except KeyboardInterrupt as e: | |
| print("Script was manually terminated") | |
| raise(e) | |