Spaces:
Sleeping
Sleeping
| """ | |
| WebSocket server for real-time audio processing simulation. | |
| Each WebSocket session maintains: | |
| - A StreamRouter that simulates multiple microphone streams through the DSP pipeline | |
| - A broadcast loop that pushes audio_update messages every 100 ms | |
| - Command handling for mute / priority / chairman changes | |
| """ | |
| import asyncio | |
| import json | |
| import random | |
| import uuid | |
| from datetime import datetime, timezone | |
| from typing import Dict, Optional, Set | |
| from fastapi import APIRouter, WebSocket, WebSocketDisconnect | |
| from audio_engine.stream_router import StreamRouter | |
| router = APIRouter(tags=["WebSocket"]) | |
| # --------------------------------------------------------------------------- | |
| # Default microphone roster used when a session is first created | |
| # --------------------------------------------------------------------------- | |
| _DEFAULT_MICROPHONES = [ | |
| {"mic_id": "mic_1", "name": "Participant 1", "priority": 5, "is_chairman": False}, | |
| {"mic_id": "mic_2", "name": "Participant 2", "priority": 5, "is_chairman": False}, | |
| {"mic_id": "mic_3", "name": "Participant 3", "priority": 4, "is_chairman": False}, | |
| {"mic_id": "mic_4", "name": "Chairman", "priority": 9, "is_chairman": True}, | |
| ] | |
| # --------------------------------------------------------------------------- | |
| # In-memory session registry | |
| # --------------------------------------------------------------------------- | |
| class AudioSession: | |
| """ | |
| One logical audio session (e.g. a meeting room). | |
| Multiple WebSocket clients can subscribe to the same session and all | |
| receive identical broadcast updates. | |
| """ | |
| def __init__(self, session_id: str): | |
| self.session_id = session_id | |
| self.router = StreamRouter(sample_rate=16000, frame_size=512) | |
| self.clients: Set[WebSocket] = set() | |
| self._sim_task: Optional[asyncio.Task] = None | |
| self._running = False | |
| # Speech-pattern state for a more realistic simulation | |
| self._speech_states: Dict[str, dict] = {} | |
| for mic_def in _DEFAULT_MICROPHONES: | |
| self.router.add_stream( | |
| mic_id=mic_def["mic_id"], | |
| name=mic_def["name"], | |
| priority=mic_def["priority"], | |
| is_chairman=mic_def["is_chairman"], | |
| ) | |
| self._speech_states[mic_def["mic_id"]] = { | |
| "speaking": False, | |
| "ticks_remaining": 0, | |
| "silence_remaining": random.randint(5, 20), | |
| } | |
| # ------------------------------------------------------------------ | |
| # Client management | |
| # ------------------------------------------------------------------ | |
| def add_client(self, ws: WebSocket) -> None: | |
| self.clients.add(ws) | |
| def remove_client(self, ws: WebSocket) -> None: | |
| self.clients.discard(ws) | |
| # ------------------------------------------------------------------ | |
| # Simulation loop | |
| # ------------------------------------------------------------------ | |
| def start(self) -> None: | |
| if not self._running: | |
| self._running = True | |
| self._sim_task = asyncio.create_task(self._simulation_loop()) | |
| def stop(self) -> None: | |
| self._running = False | |
| if self._sim_task and not self._sim_task.done(): | |
| self._sim_task.cancel() | |
| async def _simulation_loop(self) -> None: | |
| """Simulate audio frames at 10 Hz and broadcast results to all clients.""" | |
| while self._running: | |
| try: | |
| payload = await self._tick() | |
| if self.clients: | |
| message = json.dumps(payload, default=str) | |
| dead: Set[WebSocket] = set() | |
| for ws in list(self.clients): | |
| try: | |
| await ws.send_text(message) | |
| except Exception: | |
| dead.add(ws) | |
| self.clients -= dead | |
| except asyncio.CancelledError: | |
| break | |
| except Exception: | |
| pass | |
| await asyncio.sleep(0.1) # 100 ms tick | |
| async def _tick(self) -> dict: | |
| """Advance the simulation by one tick and return the broadcast payload.""" | |
| microphone_updates: dict = {} | |
| all_states = self.router.get_stream_states() | |
| for mic_id, sim in self._speech_states.items(): | |
| state = all_states.get(mic_id, {}) | |
| if state.get("is_muted", False): | |
| # Muted – no audio processing needed | |
| microphone_updates[mic_id] = _build_mic_payload(mic_id, state, {}) | |
| continue | |
| # Advance the speech / silence FSM | |
| if sim["speaking"]: | |
| sim["ticks_remaining"] -= 1 | |
| if sim["ticks_remaining"] <= 0: | |
| sim["speaking"] = False | |
| sim["silence_remaining"] = random.randint(5, 30) | |
| else: | |
| sim["silence_remaining"] -= 1 | |
| if sim["silence_remaining"] <= 0: | |
| sim["speaking"] = True | |
| sim["ticks_remaining"] = random.randint(10, 40) | |
| has_speech = sim["speaking"] | |
| db_level = random.uniform(-32.0, -25.0) if has_speech else random.uniform(-70.0, -55.0) | |
| frame = self.router.simulate_audio_frame( | |
| mic_id, db_level=db_level, has_speech=has_speech | |
| ) | |
| result = self.router.process_frame(mic_id, frame) | |
| microphone_updates[mic_id] = _build_mic_payload(mic_id, state, result) | |
| return { | |
| "type": "audio_update", | |
| "session_id": self.session_id, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "microphones": microphone_updates, | |
| "active_speaker": _find_active_speaker(microphone_updates), | |
| "system_health": _generate_health_metrics(), | |
| } | |
| # ------------------------------------------------------------------ | |
| # Command handlers | |
| # ------------------------------------------------------------------ | |
| def handle_command(self, command: dict) -> dict: | |
| """ | |
| Dispatch an inbound control command. Returns an ack/error dict. | |
| """ | |
| cmd_type = command.get("type") | |
| mic_id: Optional[str] = command.get("mic_id") | |
| if cmd_type == "mute": | |
| muted: bool = command.get("muted", True) | |
| if not mic_id: | |
| return {"type": "error", "message": "mic_id required for mute"} | |
| self.router.set_microphone_muted(mic_id, muted) | |
| return {"type": "ack", "command": cmd_type, "mic_id": mic_id, "muted": muted} | |
| if cmd_type == "set_priority": | |
| priority = command.get("priority") | |
| if not mic_id or priority is None: | |
| return {"type": "error", "message": "mic_id and priority required"} | |
| try: | |
| priority = int(priority) | |
| except (TypeError, ValueError): | |
| return {"type": "error", "message": "priority must be an integer 1–10"} | |
| self.router.set_microphone_priority(mic_id, priority) | |
| return {"type": "ack", "command": cmd_type, "mic_id": mic_id, "priority": priority} | |
| if cmd_type == "set_chairman": | |
| if not mic_id: | |
| return {"type": "error", "message": "mic_id required for set_chairman"} | |
| self.router.set_chairman(mic_id) | |
| return {"type": "ack", "command": cmd_type, "mic_id": mic_id} | |
| if cmd_type == "add_microphone": | |
| name: str = command.get("name", f"Mic {mic_id}") | |
| priority: int = int(command.get("priority", 5)) | |
| is_chairman: bool = bool(command.get("is_chairman", False)) | |
| if not mic_id: | |
| return {"type": "error", "message": "mic_id required"} | |
| added = self.router.add_stream(mic_id, name, priority, is_chairman) | |
| if added: | |
| self._speech_states[mic_id] = { | |
| "speaking": False, | |
| "ticks_remaining": 0, | |
| "silence_remaining": random.randint(5, 20), | |
| } | |
| return {"type": "ack", "command": cmd_type, "mic_id": mic_id, "added": added} | |
| if cmd_type == "remove_microphone": | |
| if not mic_id: | |
| return {"type": "error", "message": "mic_id required"} | |
| self.router.remove_stream(mic_id) | |
| self._speech_states.pop(mic_id, None) | |
| return {"type": "ack", "command": cmd_type, "mic_id": mic_id} | |
| if cmd_type == "get_states": | |
| return { | |
| "type": "states", | |
| "microphones": self.router.get_stream_states(), | |
| } | |
| return {"type": "error", "message": f"Unknown command type: {cmd_type!r}"} | |
| # --------------------------------------------------------------------------- | |
| # Session registry (keyed by session_id) | |
| # --------------------------------------------------------------------------- | |
| _sessions: Dict[str, AudioSession] = {} | |
| def _get_or_create_session(session_id: str) -> AudioSession: | |
| if session_id not in _sessions: | |
| _sessions[session_id] = AudioSession(session_id) | |
| session = _sessions[session_id] | |
| session.start() | |
| return session | |
| # --------------------------------------------------------------------------- | |
| # WebSocket endpoint | |
| # --------------------------------------------------------------------------- | |
| async def websocket_endpoint(websocket: WebSocket, session_id: str) -> None: | |
| await websocket.accept() | |
| session = _get_or_create_session(session_id) | |
| session.add_client(websocket) | |
| # Greet the new client with current microphone states | |
| await websocket.send_text( | |
| json.dumps( | |
| { | |
| "type": "connected", | |
| "session_id": session_id, | |
| "microphones": session.router.get_stream_states(), | |
| "message": "Connected to SD-MMMS audio session", | |
| }, | |
| default=str, | |
| ) | |
| ) | |
| try: | |
| while True: | |
| raw = await websocket.receive_text() | |
| try: | |
| command = json.loads(raw) | |
| except json.JSONDecodeError: | |
| await websocket.send_text( | |
| json.dumps({"type": "error", "message": "Invalid JSON"}) | |
| ) | |
| continue | |
| response = session.handle_command(command) | |
| await websocket.send_text(json.dumps(response, default=str)) | |
| except WebSocketDisconnect: | |
| pass | |
| finally: | |
| session.remove_client(websocket) | |
| # Clean up idle sessions (no clients left) | |
| if not session.clients: | |
| session.stop() | |
| _sessions.pop(session_id, None) | |
| # --------------------------------------------------------------------------- | |
| # WebRTC signaling relay (phone-as-microphone) | |
| # --------------------------------------------------------------------------- | |
| # | |
| # A lightweight, stateless relay: two peers (a desktop "receiver" and a phone | |
| # "sender") join a room keyed by a one-time token and forward SDP / ICE | |
| # messages to each other. No media passes through the server. | |
| _signal_rooms: Dict[str, Set[WebSocket]] = {} | |
| async def _signal_send(ws: WebSocket, payload: dict) -> None: | |
| try: | |
| await ws.send_text(json.dumps(payload)) | |
| except Exception: | |
| pass | |
| async def signaling_endpoint(websocket: WebSocket, token: str) -> None: | |
| await websocket.accept() | |
| room = _signal_rooms.setdefault(token, set()) | |
| if len(room) >= 2: | |
| # Room already has both peers; reject the third. | |
| await _signal_send(websocket, {"type": "full"}) | |
| await websocket.close() | |
| return | |
| room.add(websocket) | |
| # Tell the newcomer how many peers were already present, and notify others. | |
| await _signal_send(websocket, {"type": "joined", "peers": len(room) - 1}) | |
| for peer in list(room): | |
| if peer is not websocket: | |
| await _signal_send(peer, {"type": "peer-joined"}) | |
| try: | |
| while True: | |
| raw = await websocket.receive_text() | |
| # Relay verbatim to the other peer(s) in the room. | |
| for peer in list(room): | |
| if peer is not websocket: | |
| try: | |
| await peer.send_text(raw) | |
| except Exception: | |
| pass | |
| except WebSocketDisconnect: | |
| pass | |
| finally: | |
| room.discard(websocket) | |
| if room: | |
| for peer in list(room): | |
| await _signal_send(peer, {"type": "peer-left"}) | |
| else: | |
| _signal_rooms.pop(token, None) | |
| # --------------------------------------------------------------------------- | |
| # Multi-party room mesh signaling (real participant audio) | |
| # --------------------------------------------------------------------------- | |
| # | |
| # Each participant joins a room and is assigned a peer_id. The server keeps a | |
| # roster per room and relays SDP/ICE messages either to a specific target peer | |
| # (`to`) or broadcasts control events (mic state, meeting ended) to everyone. | |
| # Audio itself flows peer-to-peer over WebRTC; only signaling passes here. | |
| class _RoomPeer: | |
| def __init__(self, ws: WebSocket, uid: str, name: str): | |
| self.ws = ws | |
| self.uid = uid | |
| self.name = name | |
| _rooms: Dict[str, Dict[str, _RoomPeer]] = {} | |
| async def room_mesh_endpoint(websocket: WebSocket, room_id: str) -> None: | |
| await websocket.accept() | |
| uid = websocket.query_params.get("uid", "") | |
| name = websocket.query_params.get("name", "") | |
| peer_id = uuid.uuid4().hex | |
| peers = _rooms.setdefault(room_id, {}) | |
| # Greet the newcomer with the existing roster, then register & announce. | |
| await _signal_send( | |
| websocket, | |
| { | |
| "type": "welcome", | |
| "peer_id": peer_id, | |
| "peers": [ | |
| {"peer_id": pid, "uid": p.uid, "name": p.name} | |
| for pid, p in peers.items() | |
| ], | |
| }, | |
| ) | |
| for p in peers.values(): | |
| await _signal_send( | |
| p.ws, {"type": "peer-join", "peer_id": peer_id, "uid": uid, "name": name} | |
| ) | |
| peers[peer_id] = _RoomPeer(websocket, uid, name) | |
| try: | |
| while True: | |
| raw = await websocket.receive_text() | |
| try: | |
| msg = json.loads(raw) | |
| except json.JSONDecodeError: | |
| continue | |
| target = msg.get("to") | |
| if target and target in peers: | |
| # Directed SDP/ICE message → forward to the named peer. | |
| msg["from"] = peer_id | |
| await _signal_send(peers[target].ws, msg) | |
| elif msg.get("type") == "broadcast": | |
| # Control event (mic-state / ended) → everyone else in the room. | |
| data = dict(msg.get("data", {})) | |
| data["from"] = peer_id | |
| data["uid"] = uid | |
| for pid, p in list(peers.items()): | |
| if pid != peer_id: | |
| await _signal_send(p.ws, data) | |
| except WebSocketDisconnect: | |
| pass | |
| finally: | |
| peers.pop(peer_id, None) | |
| for p in list(peers.values()): | |
| await _signal_send(p.ws, {"type": "peer-leave", "peer_id": peer_id}) | |
| if not peers: | |
| _rooms.pop(room_id, None) | |
| # --------------------------------------------------------------------------- | |
| # Private helpers | |
| # --------------------------------------------------------------------------- | |
| def _build_mic_payload(mic_id: str, state: dict, result: dict) -> dict: | |
| """Merge ORM state with DSP result into the per-microphone broadcast payload.""" | |
| vad = result.get("vad", {}) | |
| return { | |
| "mic_id": mic_id, | |
| "name": state.get("name", mic_id), | |
| "db_level": round(vad.get("db_level", state.get("db_level", -100.0)), 2), | |
| "is_speech_active": vad.get("is_speech", state.get("is_speech_active", False)), | |
| "speech_probability": round( | |
| vad.get("speech_probability", state.get("speech_probability", 0.0)), 3 | |
| ), | |
| "gain": round(result.get("gain", state.get("gain", 1.0)), 3), | |
| "is_muted": state.get("is_muted", False), | |
| "priority": state.get("priority", 5), | |
| "is_chairman": state.get("is_chairman", False), | |
| "suppressed": result.get("suppressed", False), | |
| "zcr": round(vad.get("zcr", 0.0), 4), | |
| "band_energy_ratio": round(vad.get("band_energy_ratio", 0.0), 4), | |
| } | |
| def _find_active_speaker(mic_updates: dict) -> Optional[str]: | |
| """Return the mic_id of the loudest active (non-suppressed) speaker, or None.""" | |
| candidates = [ | |
| (mid, data) | |
| for mid, data in mic_updates.items() | |
| if data.get("is_speech_active") and not data.get("suppressed") and not data.get("is_muted") | |
| ] | |
| if not candidates: | |
| return None | |
| return max(candidates, key=lambda x: x[1].get("db_level", -100.0))[0] | |
| def _generate_health_metrics() -> dict: | |
| """Produce realistic-looking system health metrics for the dashboard.""" | |
| return { | |
| "cpu_usage": round(random.uniform(8.0, 35.0), 1), | |
| "memory_usage": round(random.uniform(40.0, 70.0), 1), | |
| "latency_ms": round(random.uniform(12.0, 28.0), 1), | |
| "buffer_health": round(random.uniform(92.0, 100.0), 1), | |
| "packet_loss": round(random.uniform(0.0, 0.5), 2), | |
| } | |