Spaces:
Runtime error
Runtime error
| """ | |
| Audio Processing Utilities for Real-time Microphone Streaming | |
| Handles audio data encoding, decoding, and processing for Gradio backend. | |
| """ | |
| import base64 | |
| import numpy as np | |
| import threading | |
| import queue | |
| import time | |
| import logging | |
| from datetime import datetime | |
| from typing import Optional, Dict, List, Tuple | |
| logger = logging.getLogger(__name__) | |
| class AudioBuffer: | |
| """Thread-safe audio buffer for storing audio chunks""" | |
| def __init__(self, max_size: int = 1000): | |
| self.max_size = max_size | |
| self.buffer = queue.Queue(maxsize=max_size) | |
| self.lock = threading.Lock() | |
| self._stats = { | |
| 'total_chunks': 0, | |
| 'dropped_chunks': 0, | |
| 'buffer_size': 0 | |
| } | |
| def add_chunk(self, audio_data: bytes, device_id: str, timestamp: float) -> bool: | |
| """Add audio chunk to buffer. Returns True if successful, False if buffer full""" | |
| try: | |
| chunk = { | |
| 'audio_data': audio_data, | |
| 'device_id': device_id, | |
| 'timestamp': timestamp, | |
| 'server_received': time.time() | |
| } | |
| self.buffer.put_nowait(chunk) | |
| with self.lock: | |
| self._stats['total_chunks'] += 1 | |
| self._stats['buffer_size'] = self.buffer.qsize() | |
| return True | |
| except queue.Full: | |
| with self.lock: | |
| self._stats['dropped_chunks'] += 1 | |
| logger.warning(f"Audio buffer full, dropped chunk from device {device_id}") | |
| return False | |
| def get_chunk(self, timeout: float = 0.1) -> Optional[Dict]: | |
| """Get next audio chunk from buffer""" | |
| try: | |
| chunk = self.buffer.get(timeout=timeout) | |
| with self.lock: | |
| self._stats['buffer_size'] = self.buffer.qsize() | |
| return chunk | |
| except queue.Empty: | |
| return None | |
| def get_stats(self) -> Dict: | |
| """Get buffer statistics""" | |
| with self.lock: | |
| return self._stats.copy() | |
| def clear(self): | |
| """Clear the buffer""" | |
| with self.lock: | |
| while not self.buffer.empty(): | |
| try: | |
| self.buffer.get_nowait() | |
| except queue.Empty: | |
| break | |
| self._stats['buffer_size'] = 0 | |
| class AudioProcessor: | |
| """Audio processing utilities""" | |
| def decode_base64_audio(base64_data: str) -> Optional[bytes]: | |
| """Decode base64 audio data to bytes""" | |
| try: | |
| # Clean the base64 string | |
| clean_base64 = base64_data.replace('\n', '').replace('\r', '').strip() | |
| # Add padding if necessary | |
| missing_padding = len(clean_base64) % 4 | |
| if missing_padding: | |
| clean_base64 += '=' * (4 - missing_padding) | |
| # Decode base64 to bytes | |
| audio_bytes = base64.b64decode(clean_base64) | |
| return audio_bytes | |
| except Exception as e: | |
| logger.error(f"Error decoding base64 audio: {e}") | |
| return None | |
| def encode_audio_to_base64(audio_bytes: bytes) -> str: | |
| """Encode audio bytes to base64 string""" | |
| return base64.b64encode(audio_bytes).decode('utf-8') | |
| def pcm_to_numpy(audio_bytes: bytes, sample_rate: int = 16000, channels: int = 1) -> Optional[np.ndarray]: | |
| """Convert PCM bytes to numpy array""" | |
| try: | |
| # Convert bytes to 16-bit signed integers | |
| audio_array = np.frombuffer(audio_bytes, dtype=np.int16) | |
| # Normalize to float32 range [-1.0, 1.0] | |
| audio_float = audio_array.astype(np.float32) / 32768.0 | |
| # Handle multi-channel audio | |
| if channels > 1: | |
| audio_float = audio_float.reshape(-1, channels) | |
| return audio_float | |
| except Exception as e: | |
| logger.error(f"Error converting PCM to numpy: {e}") | |
| return None | |
| def numpy_to_pcm(audio_array: np.ndarray) -> bytes: | |
| """Convert numpy array to PCM bytes""" | |
| try: | |
| # Ensure array is float32 | |
| if audio_array.dtype != np.float32: | |
| audio_array = audio_array.astype(np.float32) | |
| # Clamp values to [-1.0, 1.0] | |
| audio_array = np.clip(audio_array, -1.0, 1.0) | |
| # Convert to 16-bit signed integers | |
| audio_int16 = (audio_array * 32767).astype(np.int16) | |
| # Convert to bytes | |
| return audio_int16.tobytes() | |
| except Exception as e: | |
| logger.error(f"Error converting numpy to PCM: {e}") | |
| return b'' | |
| def calculate_audio_stats(audio_bytes: bytes) -> Dict: | |
| """Calculate basic audio statistics""" | |
| try: | |
| audio_array = np.frombuffer(audio_bytes, dtype=np.int16) | |
| audio_float = audio_array.astype(np.float32) / 32768.0 | |
| stats = { | |
| 'length_samples': len(audio_array), | |
| 'length_bytes': len(audio_bytes), | |
| 'rms': float(np.sqrt(np.mean(audio_float**2))), | |
| 'peak': float(np.max(np.abs(audio_float))), | |
| 'mean': float(np.mean(audio_float)), | |
| 'std': float(np.std(audio_float)) | |
| } | |
| return stats | |
| except Exception as e: | |
| logger.error(f"Error calculating audio stats: {e}") | |
| return {} | |
| class DeviceManager: | |
| """Manages connected audio streaming devices""" | |
| def __init__(self): | |
| self.devices = {} | |
| self.lock = threading.Lock() | |
| self.device_stats = {} | |
| def register_device(self, device_id: str, device_info: Dict) -> bool: | |
| """Register a new device""" | |
| with self.lock: | |
| self.devices[device_id] = { | |
| 'info': device_info, | |
| 'connected_at': time.time(), | |
| 'last_seen': time.time(), | |
| 'status': 'connected', | |
| 'streaming': False, | |
| 'audio_buffer': AudioBuffer() | |
| } | |
| self.device_stats[device_id] = { | |
| 'total_chunks': 0, | |
| 'bytes_received': 0, | |
| 'connection_time': time.time(), | |
| 'last_chunk_time': None | |
| } | |
| logger.info(f"Device registered: {device_id} ({device_info.get('name', 'Unknown')})") | |
| return True | |
| def unregister_device(self, device_id: str) -> bool: | |
| """Unregister a device""" | |
| with self.lock: | |
| if device_id in self.devices: | |
| self.devices[device_id]['status'] = 'disconnected' | |
| self.devices[device_id]['disconnected_at'] = time.time() | |
| # Clear the audio buffer | |
| self.devices[device_id]['audio_buffer'].clear() | |
| logger.info(f"Device unregistered: {device_id}") | |
| return True | |
| def update_device_heartbeat(self, device_id: str): | |
| """Update device last seen timestamp""" | |
| with self.lock: | |
| if device_id in self.devices: | |
| self.devices[device_id]['last_seen'] = time.time() | |
| def add_audio_chunk(self, device_id: str, audio_data: bytes, timestamp: float) -> bool: | |
| """Add audio chunk for a device""" | |
| with self.lock: | |
| if device_id not in self.devices: | |
| return False | |
| device = self.devices[device_id] | |
| if device['status'] != 'connected': | |
| return False | |
| # Update device streaming status | |
| device['streaming'] = True | |
| device['last_seen'] = time.time() | |
| # Update stats | |
| stats = self.device_stats[device_id] | |
| stats['total_chunks'] += 1 | |
| stats['bytes_received'] += len(audio_data) | |
| stats['last_chunk_time'] = time.time() | |
| # Add to buffer | |
| return device['audio_buffer'].add_chunk(audio_data, device_id, timestamp) | |
| def get_audio_chunk(self, device_id: str) -> Optional[Dict]: | |
| """Get next audio chunk from device""" | |
| with self.lock: | |
| if device_id not in self.devices: | |
| return None | |
| return self.devices[device_id]['audio_buffer'].get_chunk() | |
| def get_connected_devices(self) -> List[str]: | |
| """Get list of connected device IDs""" | |
| with self.lock: | |
| return [ | |
| device_id for device_id, device in self.devices.items() | |
| if device['status'] == 'connected' | |
| ] | |
| def get_device_info(self, device_id: str) -> Optional[Dict]: | |
| """Get device information""" | |
| with self.lock: | |
| if device_id in self.devices: | |
| device = self.devices[device_id].copy() | |
| device['stats'] = self.device_stats[device_id].copy() | |
| return device | |
| return None | |
| def get_all_devices_info(self) -> Dict: | |
| """Get information for all devices""" | |
| with self.lock: | |
| devices_info = {} | |
| for device_id, device in self.devices.items(): | |
| device_copy = device.copy() | |
| device_copy['stats'] = self.device_stats[device_id].copy() | |
| devices_info[device_id] = device_copy | |
| return devices_info | |
| def cleanup_inactive_devices(self, timeout_seconds: int = 300): | |
| """Remove devices that haven't been seen for a while""" | |
| current_time = time.time() | |
| with self.lock: | |
| inactive_devices = [] | |
| for device_id, device in self.devices.items(): | |
| if device['status'] == 'connected': | |
| if current_time - device['last_seen'] > timeout_seconds: | |
| inactive_devices.append(device_id) | |
| for device_id in inactive_devices: | |
| logger.info(f"Cleaning up inactive device: {device_id}") | |
| self.unregister_device(device_id) | |
| class AudioStreamManager: | |
| """Manages real-time audio streaming between devices and clients""" | |
| def __init__(self): | |
| self.device_manager = DeviceManager() | |
| self.active_streams = {} | |
| self.stream_lock = threading.Lock() | |
| self.running = False | |
| # Start cleanup thread | |
| self.cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True) | |
| self.cleanup_thread.start() | |
| def start_stream(self, device_id: str) -> bool: | |
| """Start streaming from a device""" | |
| with self.stream_lock: | |
| if device_id in self.device_manager.get_connected_devices(): | |
| self.active_streams[device_id] = { | |
| 'started_at': time.time(), | |
| 'active': True | |
| } | |
| logger.info(f"Started streaming from device: {device_id}") | |
| return True | |
| return False | |
| def stop_stream(self, device_id: str) -> bool: | |
| """Stop streaming from a device""" | |
| with self.stream_lock: | |
| if device_id in self.active_streams: | |
| self.active_streams[device_id]['active'] = False | |
| del self.active_streams[device_id] | |
| logger.info(f"Stopped streaming from device: {device_id}") | |
| return True | |
| return False | |
| def is_streaming(self, device_id: str) -> bool: | |
| """Check if device is currently streaming""" | |
| with self.stream_lock: | |
| return device_id in self.active_streams and self.active_streams[device_id]['active'] | |
| def get_streaming_devices(self) -> List[str]: | |
| """Get list of currently streaming devices""" | |
| with self.stream_lock: | |
| return [ | |
| device_id for device_id, stream in self.active_streams.items() | |
| if stream['active'] | |
| ] | |
| def _cleanup_loop(self): | |
| """Background cleanup loop""" | |
| while True: | |
| try: | |
| self.device_manager.cleanup_inactive_devices() | |
| time.sleep(30) # Run cleanup every 30 seconds | |
| except Exception as e: | |
| logger.error(f"Error in cleanup loop: {e}") | |
| time.sleep(5) | |