Spaces:
Runtime error
Runtime error
| import threading | |
| from queue import Queue, Empty | |
| import numpy as np | |
| import requests | |
| import base64 | |
| import time | |
| from dataclasses import dataclass, field | |
| import websocket | |
| import threading | |
| import ssl | |
| import librosa | |
| import os | |
| class AudioStreamingClient: | |
| def __init__(self): | |
| self.auth_token = os.environ.get("HF_AUTH_TOKEN", None) | |
| self.api_url = os.environ.get("HF_API_URL", None) | |
| self.stop_event = threading.Event() | |
| self.send_queue = Queue() | |
| self.recv_queue = Queue() | |
| self.session_id = None | |
| self.headers = { | |
| "Accept": "application/json", | |
| "Authorization": f"Bearer {self.auth_token}", | |
| "Content-Type": "application/json" | |
| } | |
| self.session_state = "idle" # Possible states: idle, sending, processing, waiting | |
| self.ws_ready = threading.Event() | |
| def start(self): | |
| print("Starting audio streaming...") | |
| ws_url = self.api_url.replace("http", "ws") + "/ws" | |
| self.ws = websocket.WebSocketApp( | |
| ws_url, | |
| header=[f"{key}: {value}" for key, value in self.headers.items()], | |
| on_open=self.on_open, | |
| on_message=self.on_message, | |
| on_error=self.on_error, | |
| on_close=self.on_close | |
| ) | |
| self.ws_thread = threading.Thread(target=self.ws.run_forever, kwargs={'sslopt': {"cert_reqs": ssl.CERT_NONE}}) | |
| self.ws_thread.start() | |
| # Wait for the WebSocket to be ready | |
| self.ws_ready.wait() | |
| self.send_thread = threading.Thread(target=self.send_audio) | |
| self.send_thread.start() | |
| def on_close(self): | |
| self.stop_event.set() | |
| self.send_thread.join() | |
| self.ws.close() | |
| self.ws_thread.join() | |
| print("Audio streaming stopped.") | |
| def on_open(self, ws): | |
| print("WebSocket connection opened.") | |
| self.ws_ready.set() # Signal that the WebSocket is ready | |
| def on_message(self, ws, message): | |
| # message is bytes | |
| if message == b'DONE': | |
| print("listen") | |
| self.session_state = "listen" | |
| else: | |
| print("processing") | |
| self.session_state = "processing" | |
| audio_np = np.frombuffer(message, dtype=np.int16) | |
| self.recv_queue.put(audio_np) | |
| def on_error(self, ws, error): | |
| print(f"WebSocket error: {error}") | |
| def on_close(self, ws, close_status_code, close_msg): | |
| print("WebSocket connection closed.") | |
| def send_audio(self): | |
| while not self.stop_event.is_set(): | |
| if not self.send_queue.empty(): | |
| chunk = self.send_queue.get() | |
| if self.session_state != "processing": | |
| self.ws.send(chunk.tobytes(), opcode=websocket.ABNF.OPCODE_BINARY) | |
| else: | |
| self.ws.send([], opcode=websocket.ABNF.OPCODE_BINARY) # handshake | |
| time.sleep(0.01) | |
| def put_audio(self, chunk, sample_rate): | |
| chunk = np.clip(chunk, -32768, 32767).astype(np.int16) | |
| chunk = chunk.astype(np.float32) / 32768.0 | |
| chunk = librosa.resample(chunk, orig_sr=48000, target_sr=16000) | |
| chunk = (chunk * 32768.0).astype(np.int16) | |
| self.send_queue.put(chunk) | |
| def get_audio(self, sample_rate, output_size): | |
| output_chunk = np.array([], dtype=np.int16) | |
| output_sample_rate = 16000 | |
| output_chunk_size = int(output_size*output_sample_rate/sample_rate) | |
| while output_chunk.size < output_chunk_size: | |
| try: | |
| self.ws.send([], opcode=websocket.ABNF.OPCODE_BINARY) # handshake | |
| chunk = self.recv_queue.get(timeout=0.1) | |
| except Empty: | |
| chunk = None | |
| if chunk is not None: | |
| # Ensure chunk is int16 and clip to valid range | |
| chunk_int16 = np.clip(chunk, -32768, 32767).astype(np.int16) | |
| output_chunk = np.concatenate([output_chunk, chunk_int16]) | |
| else: | |
| print("padding chunk of size ", len(output_chunk)) | |
| output_chunk = np.pad(output_chunk, (0, output_chunk_size - len(output_chunk))) | |
| output_chunk = output_chunk.astype(np.float32) / 32768.0 | |
| output_chunk = librosa.resample(output_chunk, orig_sr=output_sample_rate, target_sr=sample_rate) | |
| output_chunk = (output_chunk * 32768.0).astype(np.int16) | |
| print("output_chunk size: ", len(output_chunk)) | |
| output_chunk = output_chunk[:output_size] | |
| return np.pad(output_chunk, (0, output_size - len(output_chunk))) | |
| if __name__ == "__main__": | |
| client = AudioStreamingClient() | |
| client.start() |