Spaces:
Sleeping
Sleeping
| import asyncio | |
| import websockets | |
| from collections import deque | |
| import time | |
| import json | |
| import base64 | |
| import uuid | |
| import numpy as np | |
| import sys | |
| # --- Configuration --- | |
| RECEIVE_HOST = "localhost" | |
| RECEIVE_PORT = 8765 | |
| SEND_HOST = "localhost" | |
| SEND_PORT = 8766 | |
| # The target delay in seconds. | |
| TARGET_DELAY_SECONDS = 2 | |
| # The client sends a chunk of data every 1 second (1000ms). | |
| CHUNK_SEND_INTERVAL = 1.0 | |
| # --- Server Logic --- | |
| async def receive_from_client_handler(websocket, internal_audio_queue): | |
| """ | |
| Handles a single WebSocket connection for receiving audio from the client. | |
| This server's only job is to receive and process data, then put it in a queue. | |
| """ | |
| print(f"[{time.strftime('%H:%M:%S')}] Receive server: New client connected from {websocket.remote_address}") | |
| try: | |
| async for message in websocket: | |
| print(f"[{time.strftime('%H:%M:%S')}] Receive server: Message received") | |
| # The client sends a JSON string, so we need to parse it. | |
| message_object = json.loads(message) | |
| # Decode the base64 audio data back to a binary array | |
| audio_data_base64 = message_object['audioData'] | |
| audio_bytes = base64.b64decode(audio_data_base64) | |
| # Truncate to even length for int16 processing | |
| audio_bytes = audio_bytes[:(len(audio_bytes) // 2) * 2] | |
| # Calculate loudness (RMS) of the audio chunk | |
| audio_samples = np.frombuffer(audio_bytes, dtype=np.int16) | |
| rms = np.sqrt(np.mean(np.square(audio_samples.astype(np.float64)))) if audio_samples.size > 0 else 0 | |
| rms = float(rms) | |
| # Add loudness and a new ID to the message object to send back | |
| message_object['id'] = str(uuid.uuid4()) | |
| message_object['loudness'] = rms | |
| await internal_audio_queue.put(message_object) | |
| print(f"[{time.strftime('%H:%M:%S')}] Receive server: Received chunk #{message_object['chunkNumber']} with ID {message_object['id']}. Queue size: {internal_audio_queue.qsize()}") | |
| except websockets.exceptions.ConnectionClosed as e: | |
| print(f"[{time.strftime('%H:%M:%S')}] Receive server: Connection closed with code {e.code}") | |
| except asyncio.CancelledError: | |
| print(f"[{time.strftime('%H:%M:%S')}] Receive server: Task cancelled.") | |
| except Exception as e: | |
| print(f"[{time.strftime('%H:%M:%S')}] Receive server: Error: {e}") | |
| finally: | |
| print(f"[{time.strftime('%H:%M:%S')}] Receive server: Client disconnected.") | |
| async def send_to_client_handler(websocket, internal_audio_queue): | |
| """ | |
| Handles a single WebSocket connection for sending delayed audio to the client. | |
| This server's only job is to get data from the internal queue and send it. | |
| """ | |
| print(f"[{time.strftime('%H:%M:%S')}] Send server: New client connected from {websocket.remote_address}") | |
| internal_audio_buffer = deque() | |
| try: | |
| print(f"[{time.strftime('%H:%M:%S')}] Send server: Waiting for buffer to fill...") | |
| # Fill the initial buffer. This blocks until enough chunks are available. | |
| while len(internal_audio_buffer) < (TARGET_DELAY_SECONDS / CHUNK_SEND_INTERVAL): | |
| chunk = await internal_audio_queue.get() | |
| internal_audio_buffer.append(chunk) | |
| print(f"[{time.strftime('%H:%M:%S')}] Send server: Filling buffer... Current size: {len(internal_audio_buffer)}") | |
| print(f"[{time.strftime('%H:%M:%S')}] Send server: Buffer filled. Starting to echo audio back.") | |
| # Main loop to continuously process and send data | |
| while True: | |
| # Get the next chunk from the queue. This will block until a new chunk is available. | |
| new_chunk = await internal_audio_queue.get() | |
| # Put the new chunk into the buffer and pop the oldest one | |
| internal_audio_buffer.append(new_chunk) | |
| chunk_to_send = internal_audio_buffer.popleft() | |
| await websocket.send(json.dumps(chunk_to_send)) | |
| print(f"[{time.strftime('%H:%M:%S')}] Send server: Sent chunk #{chunk_to_send['chunkNumber']} with ID {chunk_to_send['id']}. Buffer size: {len(internal_audio_buffer)}") | |
| except websockets.exceptions.ConnectionClosed as e: | |
| print(f"[{time.strftime('%H:%M:%S')}] Send server: Connection closed with code {e.code}") | |
| except asyncio.CancelledError: | |
| print(f"[{time.strftime('%H:%M:%S')}] Send server: Task cancelled.") | |
| finally: | |
| print(f"[{time.strftime('%H:%M:%S')}] Send server: Client disconnected.") | |
| # --- Main Server Startup --- | |
| async def main(): | |
| """ | |
| Starts both WebSocket servers concurrently and keeps them running. | |
| """ | |
| print(f"[{time.strftime('%H:%M:%S')}] Starting servers...") | |
| # Create the internal queue for communication between servers | |
| internal_audio_queue = asyncio.Queue() | |
| receive_server = await websockets.serve(lambda ws: receive_from_client_handler(ws, internal_audio_queue), RECEIVE_HOST, RECEIVE_PORT) | |
| print(f"[{time.strftime('%H:%M:%S')}] Receive server started on ws://{RECEIVE_HOST}:{RECEIVE_PORT}") | |
| send_server = await websockets.serve(lambda ws: send_to_client_handler(ws, internal_audio_queue), SEND_HOST, SEND_PORT) | |
| print(f"[{time.strftime('%H:%M:%S')}] Send server started on ws://{SEND_HOST}:{SEND_PORT}") | |
| await asyncio.gather(receive_server.wait_closed(), send_server.wait_closed()) | |
| if __name__ == "__main__": | |
| if "win" in sys.platform: | |
| asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) | |
| try: | |
| asyncio.run(main()) | |
| except KeyboardInterrupt: | |
| print("\nServer shutting down gracefully...") |