Echo-Audio-stream / EchoAudio2.py
Name108's picture
Rename EchoAudio.py to EchoAudio2.py
0682e70 verified
import asyncio
import websockets
from collections import deque
import time
import json
import base64
import uuid
import numpy as np
import sys
# --- Configuration ---
RECEIVE_HOST = "localhost"
RECEIVE_PORT = 8765
SEND_HOST = "localhost"
SEND_PORT = 8766
# The target delay in seconds.
TARGET_DELAY_SECONDS = 2
# The client sends a chunk of data every 1 second (1000ms).
CHUNK_SEND_INTERVAL = 1.0
# --- Server Logic ---
async def receive_from_client_handler(websocket, internal_audio_queue):
"""
Handles a single WebSocket connection for receiving audio from the client.
This server's only job is to receive and process data, then put it in a queue.
"""
print(f"[{time.strftime('%H:%M:%S')}] Receive server: New client connected from {websocket.remote_address}")
try:
async for message in websocket:
print(f"[{time.strftime('%H:%M:%S')}] Receive server: Message received")
# The client sends a JSON string, so we need to parse it.
message_object = json.loads(message)
# Decode the base64 audio data back to a binary array
audio_data_base64 = message_object['audioData']
audio_bytes = base64.b64decode(audio_data_base64)
# Truncate to even length for int16 processing
audio_bytes = audio_bytes[:(len(audio_bytes) // 2) * 2]
# Calculate loudness (RMS) of the audio chunk
audio_samples = np.frombuffer(audio_bytes, dtype=np.int16)
rms = np.sqrt(np.mean(np.square(audio_samples.astype(np.float64)))) if audio_samples.size > 0 else 0
rms = float(rms)
# Add loudness and a new ID to the message object to send back
message_object['id'] = str(uuid.uuid4())
message_object['loudness'] = rms
await internal_audio_queue.put(message_object)
print(f"[{time.strftime('%H:%M:%S')}] Receive server: Received chunk #{message_object['chunkNumber']} with ID {message_object['id']}. Queue size: {internal_audio_queue.qsize()}")
except websockets.exceptions.ConnectionClosed as e:
print(f"[{time.strftime('%H:%M:%S')}] Receive server: Connection closed with code {e.code}")
except asyncio.CancelledError:
print(f"[{time.strftime('%H:%M:%S')}] Receive server: Task cancelled.")
except Exception as e:
print(f"[{time.strftime('%H:%M:%S')}] Receive server: Error: {e}")
finally:
print(f"[{time.strftime('%H:%M:%S')}] Receive server: Client disconnected.")
async def send_to_client_handler(websocket, internal_audio_queue):
"""
Handles a single WebSocket connection for sending delayed audio to the client.
This server's only job is to get data from the internal queue and send it.
"""
print(f"[{time.strftime('%H:%M:%S')}] Send server: New client connected from {websocket.remote_address}")
internal_audio_buffer = deque()
try:
print(f"[{time.strftime('%H:%M:%S')}] Send server: Waiting for buffer to fill...")
# Fill the initial buffer. This blocks until enough chunks are available.
while len(internal_audio_buffer) < (TARGET_DELAY_SECONDS / CHUNK_SEND_INTERVAL):
chunk = await internal_audio_queue.get()
internal_audio_buffer.append(chunk)
print(f"[{time.strftime('%H:%M:%S')}] Send server: Filling buffer... Current size: {len(internal_audio_buffer)}")
print(f"[{time.strftime('%H:%M:%S')}] Send server: Buffer filled. Starting to echo audio back.")
# Main loop to continuously process and send data
while True:
# Get the next chunk from the queue. This will block until a new chunk is available.
new_chunk = await internal_audio_queue.get()
# Put the new chunk into the buffer and pop the oldest one
internal_audio_buffer.append(new_chunk)
chunk_to_send = internal_audio_buffer.popleft()
await websocket.send(json.dumps(chunk_to_send))
print(f"[{time.strftime('%H:%M:%S')}] Send server: Sent chunk #{chunk_to_send['chunkNumber']} with ID {chunk_to_send['id']}. Buffer size: {len(internal_audio_buffer)}")
except websockets.exceptions.ConnectionClosed as e:
print(f"[{time.strftime('%H:%M:%S')}] Send server: Connection closed with code {e.code}")
except asyncio.CancelledError:
print(f"[{time.strftime('%H:%M:%S')}] Send server: Task cancelled.")
finally:
print(f"[{time.strftime('%H:%M:%S')}] Send server: Client disconnected.")
# --- Main Server Startup ---
async def main():
"""
Starts both WebSocket servers concurrently and keeps them running.
"""
print(f"[{time.strftime('%H:%M:%S')}] Starting servers...")
# Create the internal queue for communication between servers
internal_audio_queue = asyncio.Queue()
receive_server = await websockets.serve(lambda ws: receive_from_client_handler(ws, internal_audio_queue), RECEIVE_HOST, RECEIVE_PORT)
print(f"[{time.strftime('%H:%M:%S')}] Receive server started on ws://{RECEIVE_HOST}:{RECEIVE_PORT}")
send_server = await websockets.serve(lambda ws: send_to_client_handler(ws, internal_audio_queue), SEND_HOST, SEND_PORT)
print(f"[{time.strftime('%H:%M:%S')}] Send server started on ws://{SEND_HOST}:{SEND_PORT}")
await asyncio.gather(receive_server.wait_closed(), send_server.wait_closed())
if __name__ == "__main__":
if "win" in sys.platform:
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nServer shutting down gracefully...")