Spaces:
Runtime error
Runtime error
| """ | |
| Real-time Microphone Streaming Server with Gradio Interface | |
| Hosted on Hugging Face Spaces with WebSocket support for Android devices | |
| """ | |
| import gradio as gr | |
| import asyncio | |
| import websockets | |
| import json | |
| import logging | |
| import threading | |
| import time | |
| import base64 | |
| from datetime import datetime | |
| from typing import Dict, List, Optional | |
| import queue | |
| import numpy as np | |
| from audio_utils import AudioStreamManager, AudioProcessor | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Global variables | |
| stream_manager = AudioStreamManager() | |
| websocket_server = None | |
| connected_clients = {} | |
| client_lock = threading.Lock() | |
| # Message queues for real-time updates | |
| device_update_queue = queue.Queue() | |
| audio_data_queue = queue.Queue() | |
| class GradioWebSocketHandler: | |
| """Handles WebSocket connections from Android devices""" | |
| def __init__(self): | |
| self.running = False | |
| async def handle_client(self, websocket, path): | |
| """Handle individual WebSocket client connection""" | |
| client_id = f"{websocket.remote_address[0]}:{websocket.remote_address[1]}" | |
| client_type = None | |
| device_id = None | |
| logger.info(f"New WebSocket connection from {client_id}") | |
| with client_lock: | |
| connected_clients[client_id] = { | |
| 'websocket': websocket, | |
| 'connected_at': time.time(), | |
| 'type': 'unknown' | |
| } | |
| try: | |
| # Send welcome message | |
| await websocket.send(json.dumps({ | |
| 'type': 'welcome', | |
| 'message': 'Connected to Real-time Audio Streaming Server (Gradio)', | |
| 'timestamp': datetime.now().isoformat(), | |
| 'server_info': { | |
| 'platform': 'Gradio/HuggingFace', | |
| 'version': '1.0.0' | |
| } | |
| })) | |
| async for message in websocket: | |
| try: | |
| data = json.loads(message) | |
| message_type = data.get('type', '') | |
| if message_type == 'register_device': | |
| # Android device registration | |
| client_type = 'device' | |
| device_id = data.get('device_id', '') | |
| device_name = data.get('device_name', 'Unknown Device') | |
| device_model = data.get('device_model', 'Unknown') | |
| # Register device with stream manager | |
| device_info = { | |
| 'name': device_name, | |
| 'model': device_model, | |
| 'client_id': client_id, | |
| 'platform': 'Android' | |
| } | |
| stream_manager.device_manager.register_device(device_id, device_info) | |
| with client_lock: | |
| connected_clients[client_id]['type'] = 'device' | |
| connected_clients[client_id]['device_id'] = device_id | |
| # Send registration confirmation | |
| await websocket.send(json.dumps({ | |
| 'type': 'registration_success', | |
| 'device_id': device_id, | |
| 'message': 'Device registered successfully', | |
| 'timestamp': datetime.now().isoformat() | |
| })) | |
| # Notify Gradio interface | |
| device_update_queue.put({ | |
| 'type': 'device_connected', | |
| 'device_id': device_id, | |
| 'device_info': device_info | |
| }) | |
| logger.info(f"Device registered: {device_id} ({device_name})") | |
| elif message_type == 'audio_chunk': | |
| # Audio data from Android device | |
| if client_type == 'device' and device_id: | |
| audio_data = data.get('data', {}) | |
| # Decode base64 audio | |
| base64_audio = audio_data.get('audio_data', '') | |
| if base64_audio: | |
| audio_bytes = AudioProcessor.decode_base64_audio(base64_audio) | |
| if audio_bytes: | |
| timestamp = data.get('timestamp', time.time()) | |
| # Add to stream manager | |
| stream_manager.device_manager.add_audio_chunk( | |
| device_id, audio_bytes, timestamp | |
| ) | |
| # Queue for Gradio display | |
| audio_data_queue.put({ | |
| 'device_id': device_id, | |
| 'timestamp': timestamp, | |
| 'size_bytes': len(audio_bytes), | |
| 'sample_rate': audio_data.get('sample_rate', 16000) | |
| }) | |
| elif message_type == 'start_streaming': | |
| if client_type == 'device' and device_id: | |
| stream_manager.start_stream(device_id) | |
| await websocket.send(json.dumps({ | |
| 'type': 'streaming_started', | |
| 'device_id': device_id, | |
| 'timestamp': datetime.now().isoformat() | |
| })) | |
| elif message_type == 'stop_streaming': | |
| if client_type == 'device' and device_id: | |
| stream_manager.stop_stream(device_id) | |
| await websocket.send(json.dumps({ | |
| 'type': 'streaming_stopped', | |
| 'device_id': device_id, | |
| 'timestamp': datetime.now().isoformat() | |
| })) | |
| elif message_type == 'ping': | |
| # Heartbeat response | |
| if device_id: | |
| stream_manager.device_manager.update_device_heartbeat(device_id) | |
| await websocket.send(json.dumps({ | |
| 'type': 'pong', | |
| 'timestamp': datetime.now().isoformat() | |
| })) | |
| else: | |
| logger.warning(f"Unknown message type: {message_type}") | |
| except json.JSONDecodeError as e: | |
| logger.error(f"JSON decode error: {e}") | |
| except Exception as e: | |
| logger.error(f"Error processing message: {e}") | |
| except websockets.exceptions.ConnectionClosed: | |
| logger.info(f"WebSocket connection closed: {client_id}") | |
| except Exception as e: | |
| logger.error(f"WebSocket error: {e}") | |
| finally: | |
| # Cleanup on disconnect | |
| with client_lock: | |
| if client_id in connected_clients: | |
| client_info = connected_clients[client_id] | |
| if client_info.get('type') == 'device' and 'device_id' in client_info: | |
| device_id = client_info['device_id'] | |
| stream_manager.device_manager.unregister_device(device_id) | |
| # Notify Gradio interface | |
| device_update_queue.put({ | |
| 'type': 'device_disconnected', | |
| 'device_id': device_id | |
| }) | |
| logger.info(f"Device disconnected: {device_id}") | |
| del connected_clients[client_id] | |
| async def send_command_to_device(self, device_id: str, command: str) -> bool: | |
| """Send command to specific device""" | |
| with client_lock: | |
| for client_id, client_info in connected_clients.items(): | |
| if (client_info.get('type') == 'device' and | |
| client_info.get('device_id') == device_id): | |
| try: | |
| await client_info['websocket'].send(json.dumps({ | |
| 'type': command, | |
| 'timestamp': datetime.now().isoformat() | |
| })) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error sending command to device {device_id}: {e}") | |
| return False | |
| return False | |
| # WebSocket handler instance | |
| ws_handler = GradioWebSocketHandler() | |
| def start_websocket_server(): | |
| """Start WebSocket server in background thread""" | |
| global websocket_server | |
| async def run_server(): | |
| global websocket_server | |
| # Use port 7861 for WebSocket (Gradio uses 7860) | |
| websocket_server = await websockets.serve( | |
| ws_handler.handle_client, | |
| "0.0.0.0", | |
| 7861, | |
| ping_interval=30, | |
| ping_timeout=60 | |
| ) | |
| logger.info("WebSocket server started on port 7861") | |
| await websocket_server.wait_closed() | |
| def run_in_thread(): | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| loop.run_until_complete(run_server()) | |
| server_thread = threading.Thread(target=run_in_thread, daemon=True) | |
| server_thread.start() | |
| def get_connected_devices() -> List[Dict]: | |
| """Get list of connected devices for Gradio display""" | |
| devices = stream_manager.device_manager.get_all_devices_info() | |
| device_list = [] | |
| for device_id, device_info in devices.items(): | |
| if device_info.get('status') == 'connected': | |
| device_data = { | |
| 'device_id': device_id, | |
| 'name': device_info.get('info', {}).get('name', 'Unknown'), | |
| 'model': device_info.get('info', {}).get('model', 'Unknown'), | |
| 'connected_at': datetime.fromtimestamp(device_info.get('connected_at', 0)).strftime('%H:%M:%S'), | |
| 'streaming': device_info.get('streaming', False), | |
| 'total_chunks': device_info.get('stats', {}).get('total_chunks', 0), | |
| 'bytes_received': device_info.get('stats', {}).get('bytes_received', 0), | |
| 'last_seen': 'Now' if time.time() - device_info.get('last_seen', 0) < 10 else f"{int(time.time() - device_info.get('last_seen', 0))}s ago" | |
| } | |
| device_list.append(device_data) | |
| return device_list | |
| def start_streaming(device_id: str) -> str: | |
| """Start streaming from a device""" | |
| if stream_manager.start_stream(device_id): | |
| # Send command to device | |
| asyncio.run(ws_handler.send_command_to_device(device_id, 'start_streaming')) | |
| return f"β Started streaming from device: {device_id}" | |
| else: | |
| return f"β Failed to start streaming from device: {device_id}" | |
| def stop_streaming(device_id: str) -> str: | |
| """Stop streaming from a device""" | |
| if stream_manager.stop_stream(device_id): | |
| # Send command to device | |
| asyncio.run(ws_handler.send_command_to_device(device_id, 'stop_streaming')) | |
| return f"π Stopped streaming from device: {device_id}" | |
| else: | |
| return f"β Failed to stop streaming from device: {device_id}" | |
| def get_server_stats() -> Dict: | |
| """Get server statistics""" | |
| devices = stream_manager.device_manager.get_all_devices_info() | |
| connected_count = len([d for d in devices.values() if d.get('status') == 'connected']) | |
| streaming_count = len(stream_manager.get_streaming_devices()) | |
| total_bytes = sum([ | |
| device.get('stats', {}).get('bytes_received', 0) | |
| for device in devices.values() | |
| ]) | |
| with client_lock: | |
| total_connections = len(connected_clients) | |
| return { | |
| 'connected_devices': connected_count, | |
| 'streaming_devices': streaming_count, | |
| 'total_connections': total_connections, | |
| 'total_data_mb': round(total_bytes / (1024 * 1024), 2), | |
| 'server_uptime': time.strftime('%H:%M:%S', time.gmtime(time.time() - server_start_time)) | |
| } | |
| def update_device_list(): | |
| """Update device list for Gradio""" | |
| devices = get_connected_devices() | |
| if not devices: | |
| return "No devices connected. Install the Android app and connect to start streaming." | |
| # Create HTML table for devices | |
| html = """ | |
| <div style="font-family: Arial, sans-serif;"> | |
| <h3>π± Connected Devices</h3> | |
| <table style="width: 100%; border-collapse: collapse; margin-top: 10px;"> | |
| <thead> | |
| <tr style="background-color: #f0f0f0;"> | |
| <th style="border: 1px solid #ddd; padding: 8px; text-align: left;">Device</th> | |
| <th style="border: 1px solid #ddd; padding: 8px; text-align: left;">Status</th> | |
| <th style="border: 1px solid #ddd; padding: 8px; text-align: left;">Data</th> | |
| <th style="border: 1px solid #ddd; padding: 8px; text-align: left;">Last Seen</th> | |
| </tr> | |
| </thead> | |
| <tbody> | |
| """ | |
| for device in devices: | |
| status_icon = "ποΈ Streaming" if device['streaming'] else "β Connected" | |
| status_color = "#28a745" if device['streaming'] else "#007bff" | |
| html += f""" | |
| <tr> | |
| <td style="border: 1px solid #ddd; padding: 8px;"> | |
| <strong>{device['name']}</strong><br> | |
| <small style="color: #666;">{device['device_id'][:12]}...</small> | |
| </td> | |
| <td style="border: 1px solid #ddd; padding: 8px;"> | |
| <span style="color: {status_color}; font-weight: bold;">{status_icon}</span><br> | |
| <small>Since: {device['connected_at']}</small> | |
| </td> | |
| <td style="border: 1px solid #ddd; padding: 8px;"> | |
| <strong>{device['total_chunks']}</strong> chunks<br> | |
| <small>{round(device['bytes_received']/1024, 1)} KB</small> | |
| </td> | |
| <td style="border: 1px solid #ddd; padding: 8px;"> | |
| {device['last_seen']} | |
| </td> | |
| </tr> | |
| """ | |
| html += """ | |
| </tbody> | |
| </table> | |
| </div> | |
| """ | |
| return html | |
| def create_gradio_interface(): | |
| """Create the main Gradio interface""" | |
| with gr.Blocks(title="π€ Real-time Mic Streaming Server", theme=gr.themes.Soft()) as interface: | |
| # Header | |
| gr.HTML(""" | |
| <div style="text-align: center; padding: 20px; background: linear-gradient(90deg, #667eea 0%, #764ba2 100%); color: white; border-radius: 10px; margin-bottom: 20px;"> | |
| <h1 style="margin: 0; font-size: 2.5em;">π€ Real-time Microphone Streaming</h1> | |
| <p style="margin: 10px 0 0 0; font-size: 1.2em;">Android Device Audio Streaming Server</p> | |
| </div> | |
| """) | |
| # Connection info | |
| with gr.Row(): | |
| gr.HTML(""" | |
| <div style="background: #e8f4f8; padding: 15px; border-radius: 8px; border-left: 5px solid #007bff;"> | |
| <h3 style="margin: 0 0 10px 0;">π‘ Connection Information</h3> | |
| <p style="margin: 5px 0;"><strong>WebSocket URL:</strong> <code>wss://your-space-name.hf.space:7861</code></p> | |
| <p style="margin: 5px 0;"><strong>Protocol:</strong> WebSocket with JSON messages</p> | |
| <p style="margin: 5px 0;">Configure your Android app to connect to this WebSocket URL.</p> | |
| </div> | |
| """) | |
| # Server Statistics | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| connected_devices_display = gr.Number(label="π± Connected Devices", value=0, interactive=False) | |
| with gr.Column(scale=1): | |
| streaming_devices_display = gr.Number(label="ποΈ Streaming Devices", value=0, interactive=False) | |
| with gr.Column(scale=1): | |
| total_data_display = gr.Number(label="π Total Data (MB)", value=0.0, interactive=False) | |
| with gr.Column(scale=1): | |
| uptime_display = gr.Textbox(label="β±οΈ Server Uptime", value="00:00:00", interactive=False) | |
| # Device List | |
| device_list_display = gr.HTML( | |
| value="No devices connected. Install the Android app and connect to start streaming.", | |
| label="Connected Devices" | |
| ) | |
| # Control Buttons | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| device_id_input = gr.Textbox( | |
| label="Device ID", | |
| placeholder="Enter device ID to control", | |
| info="Copy device ID from the table above" | |
| ) | |
| with gr.Column(scale=1): | |
| start_btn = gr.Button("π€ Start Streaming", variant="primary") | |
| stop_btn = gr.Button("π Stop Streaming", variant="secondary") | |
| # Control feedback | |
| control_output = gr.Textbox(label="Control Status", interactive=False) | |
| # Audio Data Monitor (recent activity) | |
| audio_monitor = gr.HTML( | |
| value="<p>No recent audio data...</p>", | |
| label="Recent Audio Activity" | |
| ) | |
| # Event handlers | |
| start_btn.click( | |
| fn=start_streaming, | |
| inputs=[device_id_input], | |
| outputs=[control_output] | |
| ) | |
| stop_btn.click( | |
| fn=stop_streaming, | |
| inputs=[device_id_input], | |
| outputs=[control_output] | |
| ) | |
| # Auto-refresh components | |
| def update_all(): | |
| stats = get_server_stats() | |
| devices_html = update_device_list() | |
| # Get recent audio activity | |
| recent_audio = [] | |
| while not audio_data_queue.empty(): | |
| try: | |
| audio_info = audio_data_queue.get_nowait() | |
| recent_audio.append(audio_info) | |
| except queue.Empty: | |
| break | |
| if recent_audio: | |
| audio_html = "<div style='font-family: monospace; font-size: 12px;'>" | |
| audio_html += "<h4>Recent Audio Chunks:</h4>" | |
| for audio in recent_audio[-10:]: # Show last 10 chunks | |
| timestamp = datetime.fromtimestamp(audio['timestamp']).strftime('%H:%M:%S') | |
| audio_html += f"<p>{timestamp} - Device: {audio['device_id'][:12]}... | Size: {audio['size_bytes']} bytes | Rate: {audio['sample_rate']} Hz</p>" | |
| audio_html += "</div>" | |
| else: | |
| audio_html = "<p>No recent audio data...</p>" | |
| return ( | |
| stats['connected_devices'], | |
| stats['streaming_devices'], | |
| stats['total_data_mb'], | |
| stats['server_uptime'], | |
| devices_html, | |
| audio_html | |
| ) | |
| # Set up auto-refresh every 2 seconds | |
| interface.load( | |
| fn=update_all, | |
| outputs=[ | |
| connected_devices_display, | |
| streaming_devices_display, | |
| total_data_display, | |
| uptime_display, | |
| device_list_display, | |
| audio_monitor | |
| ], | |
| every=2 | |
| ) | |
| return interface | |
| # Global server start time | |
| server_start_time = time.time() | |
| def main(): | |
| """Main application entry point""" | |
| logger.info("Starting Real-time Microphone Streaming Server...") | |
| # Start WebSocket server | |
| start_websocket_server() | |
| # Wait a moment for WebSocket server to start | |
| time.sleep(2) | |
| # Create and launch Gradio interface | |
| interface = create_gradio_interface() | |
| # Launch with public access for Hugging Face | |
| interface.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, # Set to True for public tunnels during development | |
| debug=False | |
| ) | |
| if __name__ == "__main__": | |
| main() | |