File size: 5,881 Bytes
0543c56
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import asyncio
import websockets
from collections import deque
import time
import json
import base64
import uuid
import numpy as np
import sys

# --- Configuration ---
RECEIVE_HOST = "localhost"
RECEIVE_PORT = 8765

SEND_HOST = "localhost"
SEND_PORT = 8766

# The target delay in seconds.
TARGET_DELAY_SECONDS = 2
# The client sends a chunk of data every 1 second (1000ms).
CHUNK_SEND_INTERVAL = 1.0

# --- Server Logic ---
async def receive_from_client_handler(websocket, internal_audio_queue):
    """

    Handles a single WebSocket connection for receiving audio from the client.

    This server's only job is to receive and process data, then put it in a queue.

    """
    print(f"[{time.strftime('%H:%M:%S')}] Receive server: New client connected from {websocket.remote_address}")
    try:
        async for message in websocket:
            print(f"[{time.strftime('%H:%M:%S')}] Receive server: Message received")
            # The client sends a JSON string, so we need to parse it.
            message_object = json.loads(message)
            
            # Decode the base64 audio data back to a binary array
            audio_data_base64 = message_object['audioData']
            audio_bytes = base64.b64decode(audio_data_base64)
            
            # Truncate to even length for int16 processing
            audio_bytes = audio_bytes[:(len(audio_bytes) // 2) * 2]
            
            # Calculate loudness (RMS) of the audio chunk
            audio_samples = np.frombuffer(audio_bytes, dtype=np.int16)
            rms = np.sqrt(np.mean(np.square(audio_samples.astype(np.float64)))) if audio_samples.size > 0 else 0
            rms = float(rms)
            
            # Add loudness and a new ID to the message object to send back
            message_object['id'] = str(uuid.uuid4())
            message_object['loudness'] = rms

            await internal_audio_queue.put(message_object)
            print(f"[{time.strftime('%H:%M:%S')}] Receive server: Received chunk #{message_object['chunkNumber']} with ID {message_object['id']}. Queue size: {internal_audio_queue.qsize()}")
    except websockets.exceptions.ConnectionClosed as e:
        print(f"[{time.strftime('%H:%M:%S')}] Receive server: Connection closed with code {e.code}")
    except asyncio.CancelledError:
        print(f"[{time.strftime('%H:%M:%S')}] Receive server: Task cancelled.")
    except Exception as e:
        print(f"[{time.strftime('%H:%M:%S')}] Receive server: Error: {e}")
    finally:
        print(f"[{time.strftime('%H:%M:%S')}] Receive server: Client disconnected.")

async def send_to_client_handler(websocket, internal_audio_queue):
    """

    Handles a single WebSocket connection for sending delayed audio to the client.

    This server's only job is to get data from the internal queue and send it.

    """
    print(f"[{time.strftime('%H:%M:%S')}] Send server: New client connected from {websocket.remote_address}")
    internal_audio_buffer = deque()

    try:
        print(f"[{time.strftime('%H:%M:%S')}] Send server: Waiting for buffer to fill...")
        # Fill the initial buffer. This blocks until enough chunks are available.
        while len(internal_audio_buffer) < (TARGET_DELAY_SECONDS / CHUNK_SEND_INTERVAL):
            chunk = await internal_audio_queue.get()
            internal_audio_buffer.append(chunk)
            print(f"[{time.strftime('%H:%M:%S')}] Send server: Filling buffer... Current size: {len(internal_audio_buffer)}")
        
        print(f"[{time.strftime('%H:%M:%S')}] Send server: Buffer filled. Starting to echo audio back.")

        # Main loop to continuously process and send data
        while True:
            # Get the next chunk from the queue. This will block until a new chunk is available.
            new_chunk = await internal_audio_queue.get()
            
            # Put the new chunk into the buffer and pop the oldest one
            internal_audio_buffer.append(new_chunk)
            chunk_to_send = internal_audio_buffer.popleft()
            
            await websocket.send(json.dumps(chunk_to_send))
            print(f"[{time.strftime('%H:%M:%S')}] Send server: Sent chunk #{chunk_to_send['chunkNumber']} with ID {chunk_to_send['id']}. Buffer size: {len(internal_audio_buffer)}")

    except websockets.exceptions.ConnectionClosed as e:
        print(f"[{time.strftime('%H:%M:%S')}] Send server: Connection closed with code {e.code}")
    except asyncio.CancelledError:
        print(f"[{time.strftime('%H:%M:%S')}] Send server: Task cancelled.")
    finally:
        print(f"[{time.strftime('%H:%M:%S')}] Send server: Client disconnected.")

# --- Main Server Startup ---
async def main():
    """

    Starts both WebSocket servers concurrently and keeps them running.

    """
    print(f"[{time.strftime('%H:%M:%S')}] Starting servers...")
    
    # Create the internal queue for communication between servers
    internal_audio_queue = asyncio.Queue()
    
    receive_server = await websockets.serve(lambda ws: receive_from_client_handler(ws, internal_audio_queue), RECEIVE_HOST, RECEIVE_PORT)
    print(f"[{time.strftime('%H:%M:%S')}] Receive server started on ws://{RECEIVE_HOST}:{RECEIVE_PORT}")
    
    send_server = await websockets.serve(lambda ws: send_to_client_handler(ws, internal_audio_queue), SEND_HOST, SEND_PORT)
    print(f"[{time.strftime('%H:%M:%S')}] Send server started on ws://{SEND_HOST}:{SEND_PORT}")

    await asyncio.gather(receive_server.wait_closed(), send_server.wait_closed())

if __name__ == "__main__":
    if "win" in sys.platform:
        asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
    
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\nServer shutting down gracefully...")