# app.py import asyncio import base64 import json import logging from collections import deque from aiohttp import web import numpy as np import sys import time import uuid # --- Configuration --- HOST = "0.0.0.0" PORT = 7860 # The target delay in seconds. TARGET_DELAY_SECONDS = 2 # The client sends a chunk of data every 1 second (1000ms). CHUNK_SEND_INTERVAL = 1.0 # Configure logging logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(message)s') async def websocket_handler(request): """ Handles a single WebSocket connection for both sending and receiving audio. """ websocket = web.WebSocketResponse() await websocket.prepare(request) client_id = str(uuid.uuid4()) logging.info(f"[{time.strftime('%H:%M:%S')}] New client connected: {client_id}") # Use a deque as a buffer to store audio chunks for the echo delay internal_audio_buffer = deque() # Calculate the number of chunks needed to fill the buffer buffer_size = int(TARGET_DELAY_SECONDS / CHUNK_SEND_INTERVAL) try: # Main loop to continuously receive data from the client async for message in websocket: if message.type == web.WSMsgType.TEXT: # The client sends a JSON string, so we need to parse it. try: message_object = json.loads(message.data) logging.info(f"[{time.strftime('%H:%M:%S')}] Received chunk #{message_object['chunkNumber']}") # Decode the base64 audio data back to a binary array audio_data_base64 = message_object['audioData'] audio_bytes = base64.b64decode(audio_data_base64) # Calculate loudness (RMS) of the audio chunk audio_samples = np.frombuffer(audio_bytes, dtype=np.int16) rms = np.sqrt(np.mean(np.square(audio_samples.astype(np.float64)))) if audio_samples.size > 0 else 0 message_object['loudness'] = float(rms) # Add the new chunk to the buffer internal_audio_buffer.append(message_object) # If the buffer has enough chunks, pop the oldest one and send it back if len(internal_audio_buffer) >= buffer_size: chunk_to_send = internal_audio_buffer.popleft() await websocket.send_json(chunk_to_send) logging.info(f"[{time.strftime('%H:%M:%S')}] Sent echoed chunk #{chunk_to_send['chunkNumber']}") except json.JSONDecodeError: logging.error(f"[{time.strftime('%H:%M:%S')}] Error decoding JSON from message: {message.data}") except Exception as e: logging.error(f"[{time.strftime('%H:%M:%S')}] An error occurred in the main loop: {e}") elif message.type == web.WSMsgType.ERROR: logging.error(f"[{time.strftime('%H:%M:%S')}] WebSocket received an error: {websocket.exception()}") except asyncio.CancelledError: logging.info(f"[{time.strftime('%H:%M:%S')}] WebSocket connection closed unexpectedly.") finally: logging.info(f"[{time.strftime('%H:%M:%S')}] Client disconnected: {client_id}") await websocket.close() async def serve_index_html(request): """ HTTP handler to serve the index.html file. """ try: with open('index.html', 'r', encoding='utf-8') as f: content = f.read() return web.Response(text=content, content_type='text/html') except FileNotFoundError: return web.Response(text="index.html not found", status=404) async def main(): """ Main function to run the aiohttp application. """ app = web.Application() # Route for the HTML file app.router.add_get('/', serve_index_html) # Route for the WebSocket connection app.router.add_get('/ws', websocket_handler) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, HOST, PORT) logging.info(f"[{time.strftime('%H:%M:%S')}] Starting server on http://{HOST}:{PORT}") logging.info(f"[{time.strftime('%H:%M:%S')}] WebSocket endpoint: ws://{HOST}:{PORT}/ws") await site.start() # Keep the server running until terminated await asyncio.Event().wait() if __name__ == '__main__': if "win" in sys.platform: asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) try: asyncio.run(main()) except KeyboardInterrupt: print("\nServer shutting down gracefully...")