""" WebSocket server for real-time audio processing simulation. Each WebSocket session maintains: - A StreamRouter that simulates multiple microphone streams through the DSP pipeline - A broadcast loop that pushes audio_update messages every 100 ms - Command handling for mute / priority / chairman changes """ import asyncio import json import random import uuid from datetime import datetime, timezone from typing import Dict, Optional, Set from fastapi import APIRouter, WebSocket, WebSocketDisconnect from audio_engine.stream_router import StreamRouter router = APIRouter(tags=["WebSocket"]) # --------------------------------------------------------------------------- # Default microphone roster used when a session is first created # --------------------------------------------------------------------------- _DEFAULT_MICROPHONES = [ {"mic_id": "mic_1", "name": "Participant 1", "priority": 5, "is_chairman": False}, {"mic_id": "mic_2", "name": "Participant 2", "priority": 5, "is_chairman": False}, {"mic_id": "mic_3", "name": "Participant 3", "priority": 4, "is_chairman": False}, {"mic_id": "mic_4", "name": "Chairman", "priority": 9, "is_chairman": True}, ] # --------------------------------------------------------------------------- # In-memory session registry # --------------------------------------------------------------------------- class AudioSession: """ One logical audio session (e.g. a meeting room). Multiple WebSocket clients can subscribe to the same session and all receive identical broadcast updates. """ def __init__(self, session_id: str): self.session_id = session_id self.router = StreamRouter(sample_rate=16000, frame_size=512) self.clients: Set[WebSocket] = set() self._sim_task: Optional[asyncio.Task] = None self._running = False # Speech-pattern state for a more realistic simulation self._speech_states: Dict[str, dict] = {} for mic_def in _DEFAULT_MICROPHONES: self.router.add_stream( mic_id=mic_def["mic_id"], name=mic_def["name"], priority=mic_def["priority"], is_chairman=mic_def["is_chairman"], ) self._speech_states[mic_def["mic_id"]] = { "speaking": False, "ticks_remaining": 0, "silence_remaining": random.randint(5, 20), } # ------------------------------------------------------------------ # Client management # ------------------------------------------------------------------ def add_client(self, ws: WebSocket) -> None: self.clients.add(ws) def remove_client(self, ws: WebSocket) -> None: self.clients.discard(ws) # ------------------------------------------------------------------ # Simulation loop # ------------------------------------------------------------------ def start(self) -> None: if not self._running: self._running = True self._sim_task = asyncio.create_task(self._simulation_loop()) def stop(self) -> None: self._running = False if self._sim_task and not self._sim_task.done(): self._sim_task.cancel() async def _simulation_loop(self) -> None: """Simulate audio frames at 10 Hz and broadcast results to all clients.""" while self._running: try: payload = await self._tick() if self.clients: message = json.dumps(payload, default=str) dead: Set[WebSocket] = set() for ws in list(self.clients): try: await ws.send_text(message) except Exception: dead.add(ws) self.clients -= dead except asyncio.CancelledError: break except Exception: pass await asyncio.sleep(0.1) # 100 ms tick async def _tick(self) -> dict: """Advance the simulation by one tick and return the broadcast payload.""" microphone_updates: dict = {} all_states = self.router.get_stream_states() for mic_id, sim in self._speech_states.items(): state = all_states.get(mic_id, {}) if state.get("is_muted", False): # Muted – no audio processing needed microphone_updates[mic_id] = _build_mic_payload(mic_id, state, {}) continue # Advance the speech / silence FSM if sim["speaking"]: sim["ticks_remaining"] -= 1 if sim["ticks_remaining"] <= 0: sim["speaking"] = False sim["silence_remaining"] = random.randint(5, 30) else: sim["silence_remaining"] -= 1 if sim["silence_remaining"] <= 0: sim["speaking"] = True sim["ticks_remaining"] = random.randint(10, 40) has_speech = sim["speaking"] db_level = random.uniform(-32.0, -25.0) if has_speech else random.uniform(-70.0, -55.0) frame = self.router.simulate_audio_frame( mic_id, db_level=db_level, has_speech=has_speech ) result = self.router.process_frame(mic_id, frame) microphone_updates[mic_id] = _build_mic_payload(mic_id, state, result) return { "type": "audio_update", "session_id": self.session_id, "timestamp": datetime.now(timezone.utc).isoformat(), "microphones": microphone_updates, "active_speaker": _find_active_speaker(microphone_updates), "system_health": _generate_health_metrics(), } # ------------------------------------------------------------------ # Command handlers # ------------------------------------------------------------------ def handle_command(self, command: dict) -> dict: """ Dispatch an inbound control command. Returns an ack/error dict. """ cmd_type = command.get("type") mic_id: Optional[str] = command.get("mic_id") if cmd_type == "mute": muted: bool = command.get("muted", True) if not mic_id: return {"type": "error", "message": "mic_id required for mute"} self.router.set_microphone_muted(mic_id, muted) return {"type": "ack", "command": cmd_type, "mic_id": mic_id, "muted": muted} if cmd_type == "set_priority": priority = command.get("priority") if not mic_id or priority is None: return {"type": "error", "message": "mic_id and priority required"} try: priority = int(priority) except (TypeError, ValueError): return {"type": "error", "message": "priority must be an integer 1–10"} self.router.set_microphone_priority(mic_id, priority) return {"type": "ack", "command": cmd_type, "mic_id": mic_id, "priority": priority} if cmd_type == "set_chairman": if not mic_id: return {"type": "error", "message": "mic_id required for set_chairman"} self.router.set_chairman(mic_id) return {"type": "ack", "command": cmd_type, "mic_id": mic_id} if cmd_type == "add_microphone": name: str = command.get("name", f"Mic {mic_id}") priority: int = int(command.get("priority", 5)) is_chairman: bool = bool(command.get("is_chairman", False)) if not mic_id: return {"type": "error", "message": "mic_id required"} added = self.router.add_stream(mic_id, name, priority, is_chairman) if added: self._speech_states[mic_id] = { "speaking": False, "ticks_remaining": 0, "silence_remaining": random.randint(5, 20), } return {"type": "ack", "command": cmd_type, "mic_id": mic_id, "added": added} if cmd_type == "remove_microphone": if not mic_id: return {"type": "error", "message": "mic_id required"} self.router.remove_stream(mic_id) self._speech_states.pop(mic_id, None) return {"type": "ack", "command": cmd_type, "mic_id": mic_id} if cmd_type == "get_states": return { "type": "states", "microphones": self.router.get_stream_states(), } return {"type": "error", "message": f"Unknown command type: {cmd_type!r}"} # --------------------------------------------------------------------------- # Session registry (keyed by session_id) # --------------------------------------------------------------------------- _sessions: Dict[str, AudioSession] = {} def _get_or_create_session(session_id: str) -> AudioSession: if session_id not in _sessions: _sessions[session_id] = AudioSession(session_id) session = _sessions[session_id] session.start() return session # --------------------------------------------------------------------------- # WebSocket endpoint # --------------------------------------------------------------------------- @router.websocket("/ws/{session_id}") async def websocket_endpoint(websocket: WebSocket, session_id: str) -> None: await websocket.accept() session = _get_or_create_session(session_id) session.add_client(websocket) # Greet the new client with current microphone states await websocket.send_text( json.dumps( { "type": "connected", "session_id": session_id, "microphones": session.router.get_stream_states(), "message": "Connected to SD-MMMS audio session", }, default=str, ) ) try: while True: raw = await websocket.receive_text() try: command = json.loads(raw) except json.JSONDecodeError: await websocket.send_text( json.dumps({"type": "error", "message": "Invalid JSON"}) ) continue response = session.handle_command(command) await websocket.send_text(json.dumps(response, default=str)) except WebSocketDisconnect: pass finally: session.remove_client(websocket) # Clean up idle sessions (no clients left) if not session.clients: session.stop() _sessions.pop(session_id, None) # --------------------------------------------------------------------------- # WebRTC signaling relay (phone-as-microphone) # --------------------------------------------------------------------------- # # A lightweight, stateless relay: two peers (a desktop "receiver" and a phone # "sender") join a room keyed by a one-time token and forward SDP / ICE # messages to each other. No media passes through the server. _signal_rooms: Dict[str, Set[WebSocket]] = {} async def _signal_send(ws: WebSocket, payload: dict) -> None: try: await ws.send_text(json.dumps(payload)) except Exception: pass @router.websocket("/ws/signal/{token}") async def signaling_endpoint(websocket: WebSocket, token: str) -> None: await websocket.accept() room = _signal_rooms.setdefault(token, set()) if len(room) >= 2: # Room already has both peers; reject the third. await _signal_send(websocket, {"type": "full"}) await websocket.close() return room.add(websocket) # Tell the newcomer how many peers were already present, and notify others. await _signal_send(websocket, {"type": "joined", "peers": len(room) - 1}) for peer in list(room): if peer is not websocket: await _signal_send(peer, {"type": "peer-joined"}) try: while True: raw = await websocket.receive_text() # Relay verbatim to the other peer(s) in the room. for peer in list(room): if peer is not websocket: try: await peer.send_text(raw) except Exception: pass except WebSocketDisconnect: pass finally: room.discard(websocket) if room: for peer in list(room): await _signal_send(peer, {"type": "peer-left"}) else: _signal_rooms.pop(token, None) # --------------------------------------------------------------------------- # Multi-party room mesh signaling (real participant audio) # --------------------------------------------------------------------------- # # Each participant joins a room and is assigned a peer_id. The server keeps a # roster per room and relays SDP/ICE messages either to a specific target peer # (`to`) or broadcasts control events (mic state, meeting ended) to everyone. # Audio itself flows peer-to-peer over WebRTC; only signaling passes here. class _RoomPeer: def __init__(self, ws: WebSocket, uid: str, name: str): self.ws = ws self.uid = uid self.name = name _rooms: Dict[str, Dict[str, _RoomPeer]] = {} @router.websocket("/ws/room/{room_id}") async def room_mesh_endpoint(websocket: WebSocket, room_id: str) -> None: await websocket.accept() uid = websocket.query_params.get("uid", "") name = websocket.query_params.get("name", "") peer_id = uuid.uuid4().hex peers = _rooms.setdefault(room_id, {}) # Greet the newcomer with the existing roster, then register & announce. await _signal_send( websocket, { "type": "welcome", "peer_id": peer_id, "peers": [ {"peer_id": pid, "uid": p.uid, "name": p.name} for pid, p in peers.items() ], }, ) for p in peers.values(): await _signal_send( p.ws, {"type": "peer-join", "peer_id": peer_id, "uid": uid, "name": name} ) peers[peer_id] = _RoomPeer(websocket, uid, name) try: while True: raw = await websocket.receive_text() try: msg = json.loads(raw) except json.JSONDecodeError: continue target = msg.get("to") if target and target in peers: # Directed SDP/ICE message → forward to the named peer. msg["from"] = peer_id await _signal_send(peers[target].ws, msg) elif msg.get("type") == "broadcast": # Control event (mic-state / ended) → everyone else in the room. data = dict(msg.get("data", {})) data["from"] = peer_id data["uid"] = uid for pid, p in list(peers.items()): if pid != peer_id: await _signal_send(p.ws, data) except WebSocketDisconnect: pass finally: peers.pop(peer_id, None) for p in list(peers.values()): await _signal_send(p.ws, {"type": "peer-leave", "peer_id": peer_id}) if not peers: _rooms.pop(room_id, None) # --------------------------------------------------------------------------- # Private helpers # --------------------------------------------------------------------------- def _build_mic_payload(mic_id: str, state: dict, result: dict) -> dict: """Merge ORM state with DSP result into the per-microphone broadcast payload.""" vad = result.get("vad", {}) return { "mic_id": mic_id, "name": state.get("name", mic_id), "db_level": round(vad.get("db_level", state.get("db_level", -100.0)), 2), "is_speech_active": vad.get("is_speech", state.get("is_speech_active", False)), "speech_probability": round( vad.get("speech_probability", state.get("speech_probability", 0.0)), 3 ), "gain": round(result.get("gain", state.get("gain", 1.0)), 3), "is_muted": state.get("is_muted", False), "priority": state.get("priority", 5), "is_chairman": state.get("is_chairman", False), "suppressed": result.get("suppressed", False), "zcr": round(vad.get("zcr", 0.0), 4), "band_energy_ratio": round(vad.get("band_energy_ratio", 0.0), 4), } def _find_active_speaker(mic_updates: dict) -> Optional[str]: """Return the mic_id of the loudest active (non-suppressed) speaker, or None.""" candidates = [ (mid, data) for mid, data in mic_updates.items() if data.get("is_speech_active") and not data.get("suppressed") and not data.get("is_muted") ] if not candidates: return None return max(candidates, key=lambda x: x[1].get("db_level", -100.0))[0] def _generate_health_metrics() -> dict: """Produce realistic-looking system health metrics for the dashboard.""" return { "cpu_usage": round(random.uniform(8.0, 35.0), 1), "memory_usage": round(random.uniform(40.0, 70.0), 1), "latency_ms": round(random.uniform(12.0, 28.0), 1), "buffer_health": round(random.uniform(92.0, 100.0), 1), "packet_loss": round(random.uniform(0.0, 0.5), 2), }