import asyncio import websockets from collections import deque import time import json import base64 import uuid import numpy as np import sys # --- Configuration --- RECEIVE_HOST = "localhost" RECEIVE_PORT = 8765 SEND_HOST = "localhost" SEND_PORT = 8766 # The target delay in seconds. TARGET_DELAY_SECONDS = 2 # The client sends a chunk of data every 1 second (1000ms). CHUNK_SEND_INTERVAL = 1.0 # --- Server Logic --- async def receive_from_client_handler(websocket, internal_audio_queue): """ Handles a single WebSocket connection for receiving audio from the client. This server's only job is to receive and process data, then put it in a queue. """ print(f"[{time.strftime('%H:%M:%S')}] Receive server: New client connected from {websocket.remote_address}") try: async for message in websocket: print(f"[{time.strftime('%H:%M:%S')}] Receive server: Message received") # The client sends a JSON string, so we need to parse it. message_object = json.loads(message) # Decode the base64 audio data back to a binary array audio_data_base64 = message_object['audioData'] audio_bytes = base64.b64decode(audio_data_base64) # Truncate to even length for int16 processing audio_bytes = audio_bytes[:(len(audio_bytes) // 2) * 2] # Calculate loudness (RMS) of the audio chunk audio_samples = np.frombuffer(audio_bytes, dtype=np.int16) rms = np.sqrt(np.mean(np.square(audio_samples.astype(np.float64)))) if audio_samples.size > 0 else 0 rms = float(rms) # Add loudness and a new ID to the message object to send back message_object['id'] = str(uuid.uuid4()) message_object['loudness'] = rms await internal_audio_queue.put(message_object) print(f"[{time.strftime('%H:%M:%S')}] Receive server: Received chunk #{message_object['chunkNumber']} with ID {message_object['id']}. Queue size: {internal_audio_queue.qsize()}") except websockets.exceptions.ConnectionClosed as e: print(f"[{time.strftime('%H:%M:%S')}] Receive server: Connection closed with code {e.code}") except asyncio.CancelledError: print(f"[{time.strftime('%H:%M:%S')}] Receive server: Task cancelled.") except Exception as e: print(f"[{time.strftime('%H:%M:%S')}] Receive server: Error: {e}") finally: print(f"[{time.strftime('%H:%M:%S')}] Receive server: Client disconnected.") async def send_to_client_handler(websocket, internal_audio_queue): """ Handles a single WebSocket connection for sending delayed audio to the client. This server's only job is to get data from the internal queue and send it. """ print(f"[{time.strftime('%H:%M:%S')}] Send server: New client connected from {websocket.remote_address}") internal_audio_buffer = deque() try: print(f"[{time.strftime('%H:%M:%S')}] Send server: Waiting for buffer to fill...") # Fill the initial buffer. This blocks until enough chunks are available. while len(internal_audio_buffer) < (TARGET_DELAY_SECONDS / CHUNK_SEND_INTERVAL): chunk = await internal_audio_queue.get() internal_audio_buffer.append(chunk) print(f"[{time.strftime('%H:%M:%S')}] Send server: Filling buffer... Current size: {len(internal_audio_buffer)}") print(f"[{time.strftime('%H:%M:%S')}] Send server: Buffer filled. Starting to echo audio back.") # Main loop to continuously process and send data while True: # Get the next chunk from the queue. This will block until a new chunk is available. new_chunk = await internal_audio_queue.get() # Put the new chunk into the buffer and pop the oldest one internal_audio_buffer.append(new_chunk) chunk_to_send = internal_audio_buffer.popleft() await websocket.send(json.dumps(chunk_to_send)) print(f"[{time.strftime('%H:%M:%S')}] Send server: Sent chunk #{chunk_to_send['chunkNumber']} with ID {chunk_to_send['id']}. Buffer size: {len(internal_audio_buffer)}") except websockets.exceptions.ConnectionClosed as e: print(f"[{time.strftime('%H:%M:%S')}] Send server: Connection closed with code {e.code}") except asyncio.CancelledError: print(f"[{time.strftime('%H:%M:%S')}] Send server: Task cancelled.") finally: print(f"[{time.strftime('%H:%M:%S')}] Send server: Client disconnected.") # --- Main Server Startup --- async def main(): """ Starts both WebSocket servers concurrently and keeps them running. """ print(f"[{time.strftime('%H:%M:%S')}] Starting servers...") # Create the internal queue for communication between servers internal_audio_queue = asyncio.Queue() receive_server = await websockets.serve(lambda ws: receive_from_client_handler(ws, internal_audio_queue), RECEIVE_HOST, RECEIVE_PORT) print(f"[{time.strftime('%H:%M:%S')}] Receive server started on ws://{RECEIVE_HOST}:{RECEIVE_PORT}") send_server = await websockets.serve(lambda ws: send_to_client_handler(ws, internal_audio_queue), SEND_HOST, SEND_PORT) print(f"[{time.strftime('%H:%M:%S')}] Send server started on ws://{SEND_HOST}:{SEND_PORT}") await asyncio.gather(receive_server.wait_closed(), send_server.wait_closed()) if __name__ == "__main__": if "win" in sys.platform: asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) try: asyncio.run(main()) except KeyboardInterrupt: print("\nServer shutting down gracefully...")