File size: 12,847 Bytes
0f4801b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
"""

Audio Processing Utilities for Real-time Microphone Streaming

Handles audio data encoding, decoding, and processing for Gradio backend.

"""

import base64
import numpy as np
import threading
import queue
import time
import logging
from datetime import datetime
from typing import Optional, Dict, List, Tuple

logger = logging.getLogger(__name__)

class AudioBuffer:
    """Thread-safe audio buffer for storing audio chunks"""
    
    def __init__(self, max_size: int = 1000):
        self.max_size = max_size
        self.buffer = queue.Queue(maxsize=max_size)
        self.lock = threading.Lock()
        self._stats = {
            'total_chunks': 0,
            'dropped_chunks': 0,
            'buffer_size': 0
        }
    
    def add_chunk(self, audio_data: bytes, device_id: str, timestamp: float) -> bool:
        """Add audio chunk to buffer. Returns True if successful, False if buffer full"""
        try:
            chunk = {
                'audio_data': audio_data,
                'device_id': device_id,
                'timestamp': timestamp,
                'server_received': time.time()
            }
            
            self.buffer.put_nowait(chunk)
            with self.lock:
                self._stats['total_chunks'] += 1
                self._stats['buffer_size'] = self.buffer.qsize()
            return True
            
        except queue.Full:
            with self.lock:
                self._stats['dropped_chunks'] += 1
            logger.warning(f"Audio buffer full, dropped chunk from device {device_id}")
            return False
    
    def get_chunk(self, timeout: float = 0.1) -> Optional[Dict]:
        """Get next audio chunk from buffer"""
        try:
            chunk = self.buffer.get(timeout=timeout)
            with self.lock:
                self._stats['buffer_size'] = self.buffer.qsize()
            return chunk
        except queue.Empty:
            return None
    
    def get_stats(self) -> Dict:
        """Get buffer statistics"""
        with self.lock:
            return self._stats.copy()
    
    def clear(self):
        """Clear the buffer"""
        with self.lock:
            while not self.buffer.empty():
                try:
                    self.buffer.get_nowait()
                except queue.Empty:
                    break
            self._stats['buffer_size'] = 0

class AudioProcessor:
    """Audio processing utilities"""
    
    @staticmethod
    def decode_base64_audio(base64_data: str) -> Optional[bytes]:
        """Decode base64 audio data to bytes"""
        try:
            # Clean the base64 string
            clean_base64 = base64_data.replace('\n', '').replace('\r', '').strip()
            
            # Add padding if necessary
            missing_padding = len(clean_base64) % 4
            if missing_padding:
                clean_base64 += '=' * (4 - missing_padding)
            
            # Decode base64 to bytes
            audio_bytes = base64.b64decode(clean_base64)
            return audio_bytes
            
        except Exception as e:
            logger.error(f"Error decoding base64 audio: {e}")
            return None
    
    @staticmethod
    def encode_audio_to_base64(audio_bytes: bytes) -> str:
        """Encode audio bytes to base64 string"""
        return base64.b64encode(audio_bytes).decode('utf-8')
    
    @staticmethod
    def pcm_to_numpy(audio_bytes: bytes, sample_rate: int = 16000, channels: int = 1) -> Optional[np.ndarray]:
        """Convert PCM bytes to numpy array"""
        try:
            # Convert bytes to 16-bit signed integers
            audio_array = np.frombuffer(audio_bytes, dtype=np.int16)
            
            # Normalize to float32 range [-1.0, 1.0]
            audio_float = audio_array.astype(np.float32) / 32768.0
            
            # Handle multi-channel audio
            if channels > 1:
                audio_float = audio_float.reshape(-1, channels)
            
            return audio_float
            
        except Exception as e:
            logger.error(f"Error converting PCM to numpy: {e}")
            return None
    
    @staticmethod
    def numpy_to_pcm(audio_array: np.ndarray) -> bytes:
        """Convert numpy array to PCM bytes"""
        try:
            # Ensure array is float32
            if audio_array.dtype != np.float32:
                audio_array = audio_array.astype(np.float32)
            
            # Clamp values to [-1.0, 1.0]
            audio_array = np.clip(audio_array, -1.0, 1.0)
            
            # Convert to 16-bit signed integers
            audio_int16 = (audio_array * 32767).astype(np.int16)
            
            # Convert to bytes
            return audio_int16.tobytes()
            
        except Exception as e:
            logger.error(f"Error converting numpy to PCM: {e}")
            return b''
    
    @staticmethod
    def calculate_audio_stats(audio_bytes: bytes) -> Dict:
        """Calculate basic audio statistics"""
        try:
            audio_array = np.frombuffer(audio_bytes, dtype=np.int16)
            audio_float = audio_array.astype(np.float32) / 32768.0
            
            stats = {
                'length_samples': len(audio_array),
                'length_bytes': len(audio_bytes),
                'rms': float(np.sqrt(np.mean(audio_float**2))),
                'peak': float(np.max(np.abs(audio_float))),
                'mean': float(np.mean(audio_float)),
                'std': float(np.std(audio_float))
            }
            return stats
            
        except Exception as e:
            logger.error(f"Error calculating audio stats: {e}")
            return {}

class DeviceManager:
    """Manages connected audio streaming devices"""
    
    def __init__(self):
        self.devices = {}
        self.lock = threading.Lock()
        self.device_stats = {}
    
    def register_device(self, device_id: str, device_info: Dict) -> bool:
        """Register a new device"""
        with self.lock:
            self.devices[device_id] = {
                'info': device_info,
                'connected_at': time.time(),
                'last_seen': time.time(),
                'status': 'connected',
                'streaming': False,
                'audio_buffer': AudioBuffer()
            }
            self.device_stats[device_id] = {
                'total_chunks': 0,
                'bytes_received': 0,
                'connection_time': time.time(),
                'last_chunk_time': None
            }
            
        logger.info(f"Device registered: {device_id} ({device_info.get('name', 'Unknown')})")
        return True
    
    def unregister_device(self, device_id: str) -> bool:
        """Unregister a device"""
        with self.lock:
            if device_id in self.devices:
                self.devices[device_id]['status'] = 'disconnected'
                self.devices[device_id]['disconnected_at'] = time.time()
                # Clear the audio buffer
                self.devices[device_id]['audio_buffer'].clear()
                
        logger.info(f"Device unregistered: {device_id}")
        return True
    
    def update_device_heartbeat(self, device_id: str):
        """Update device last seen timestamp"""
        with self.lock:
            if device_id in self.devices:
                self.devices[device_id]['last_seen'] = time.time()
    
    def add_audio_chunk(self, device_id: str, audio_data: bytes, timestamp: float) -> bool:
        """Add audio chunk for a device"""
        with self.lock:
            if device_id not in self.devices:
                return False
            
            device = self.devices[device_id]
            if device['status'] != 'connected':
                return False
            
            # Update device streaming status
            device['streaming'] = True
            device['last_seen'] = time.time()
            
            # Update stats
            stats = self.device_stats[device_id]
            stats['total_chunks'] += 1
            stats['bytes_received'] += len(audio_data)
            stats['last_chunk_time'] = time.time()
            
            # Add to buffer
            return device['audio_buffer'].add_chunk(audio_data, device_id, timestamp)
    
    def get_audio_chunk(self, device_id: str) -> Optional[Dict]:
        """Get next audio chunk from device"""
        with self.lock:
            if device_id not in self.devices:
                return None
            return self.devices[device_id]['audio_buffer'].get_chunk()
    
    def get_connected_devices(self) -> List[str]:
        """Get list of connected device IDs"""
        with self.lock:
            return [
                device_id for device_id, device in self.devices.items()
                if device['status'] == 'connected'
            ]
    
    def get_device_info(self, device_id: str) -> Optional[Dict]:
        """Get device information"""
        with self.lock:
            if device_id in self.devices:
                device = self.devices[device_id].copy()
                device['stats'] = self.device_stats[device_id].copy()
                return device
            return None
    
    def get_all_devices_info(self) -> Dict:
        """Get information for all devices"""
        with self.lock:
            devices_info = {}
            for device_id, device in self.devices.items():
                device_copy = device.copy()
                device_copy['stats'] = self.device_stats[device_id].copy()
                devices_info[device_id] = device_copy
            return devices_info
    
    def cleanup_inactive_devices(self, timeout_seconds: int = 300):
        """Remove devices that haven't been seen for a while"""
        current_time = time.time()
        with self.lock:
            inactive_devices = []
            for device_id, device in self.devices.items():
                if device['status'] == 'connected':
                    if current_time - device['last_seen'] > timeout_seconds:
                        inactive_devices.append(device_id)
            
            for device_id in inactive_devices:
                logger.info(f"Cleaning up inactive device: {device_id}")
                self.unregister_device(device_id)

class AudioStreamManager:
    """Manages real-time audio streaming between devices and clients"""
    
    def __init__(self):
        self.device_manager = DeviceManager()
        self.active_streams = {}
        self.stream_lock = threading.Lock()
        self.running = False
        
        # Start cleanup thread
        self.cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True)
        self.cleanup_thread.start()
    
    def start_stream(self, device_id: str) -> bool:
        """Start streaming from a device"""
        with self.stream_lock:
            if device_id in self.device_manager.get_connected_devices():
                self.active_streams[device_id] = {
                    'started_at': time.time(),
                    'active': True
                }
                logger.info(f"Started streaming from device: {device_id}")
                return True
        return False
    
    def stop_stream(self, device_id: str) -> bool:
        """Stop streaming from a device"""
        with self.stream_lock:
            if device_id in self.active_streams:
                self.active_streams[device_id]['active'] = False
                del self.active_streams[device_id]
                logger.info(f"Stopped streaming from device: {device_id}")
                return True
        return False
    
    def is_streaming(self, device_id: str) -> bool:
        """Check if device is currently streaming"""
        with self.stream_lock:
            return device_id in self.active_streams and self.active_streams[device_id]['active']
    
    def get_streaming_devices(self) -> List[str]:
        """Get list of currently streaming devices"""
        with self.stream_lock:
            return [
                device_id for device_id, stream in self.active_streams.items()
                if stream['active']
            ]
    
    def _cleanup_loop(self):
        """Background cleanup loop"""
        while True:
            try:
                self.device_manager.cleanup_inactive_devices()
                time.sleep(30)  # Run cleanup every 30 seconds
            except Exception as e:
                logger.error(f"Error in cleanup loop: {e}")
                time.sleep(5)