SDNmeeting / api /websocket_server.py
Che237
Deploy SD-MMMS FastAPI backend as Docker Space
900edd0
Raw
History Blame Contribute Delete
17.5 kB
"""
WebSocket server for real-time audio processing simulation.
Each WebSocket session maintains:
- A StreamRouter that simulates multiple microphone streams through the DSP pipeline
- A broadcast loop that pushes audio_update messages every 100 ms
- Command handling for mute / priority / chairman changes
"""
import asyncio
import json
import random
import uuid
from datetime import datetime, timezone
from typing import Dict, Optional, Set
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from audio_engine.stream_router import StreamRouter
router = APIRouter(tags=["WebSocket"])
# ---------------------------------------------------------------------------
# Default microphone roster used when a session is first created
# ---------------------------------------------------------------------------
_DEFAULT_MICROPHONES = [
{"mic_id": "mic_1", "name": "Participant 1", "priority": 5, "is_chairman": False},
{"mic_id": "mic_2", "name": "Participant 2", "priority": 5, "is_chairman": False},
{"mic_id": "mic_3", "name": "Participant 3", "priority": 4, "is_chairman": False},
{"mic_id": "mic_4", "name": "Chairman", "priority": 9, "is_chairman": True},
]
# ---------------------------------------------------------------------------
# In-memory session registry
# ---------------------------------------------------------------------------
class AudioSession:
"""
One logical audio session (e.g. a meeting room).
Multiple WebSocket clients can subscribe to the same session and all
receive identical broadcast updates.
"""
def __init__(self, session_id: str):
self.session_id = session_id
self.router = StreamRouter(sample_rate=16000, frame_size=512)
self.clients: Set[WebSocket] = set()
self._sim_task: Optional[asyncio.Task] = None
self._running = False
# Speech-pattern state for a more realistic simulation
self._speech_states: Dict[str, dict] = {}
for mic_def in _DEFAULT_MICROPHONES:
self.router.add_stream(
mic_id=mic_def["mic_id"],
name=mic_def["name"],
priority=mic_def["priority"],
is_chairman=mic_def["is_chairman"],
)
self._speech_states[mic_def["mic_id"]] = {
"speaking": False,
"ticks_remaining": 0,
"silence_remaining": random.randint(5, 20),
}
# ------------------------------------------------------------------
# Client management
# ------------------------------------------------------------------
def add_client(self, ws: WebSocket) -> None:
self.clients.add(ws)
def remove_client(self, ws: WebSocket) -> None:
self.clients.discard(ws)
# ------------------------------------------------------------------
# Simulation loop
# ------------------------------------------------------------------
def start(self) -> None:
if not self._running:
self._running = True
self._sim_task = asyncio.create_task(self._simulation_loop())
def stop(self) -> None:
self._running = False
if self._sim_task and not self._sim_task.done():
self._sim_task.cancel()
async def _simulation_loop(self) -> None:
"""Simulate audio frames at 10 Hz and broadcast results to all clients."""
while self._running:
try:
payload = await self._tick()
if self.clients:
message = json.dumps(payload, default=str)
dead: Set[WebSocket] = set()
for ws in list(self.clients):
try:
await ws.send_text(message)
except Exception:
dead.add(ws)
self.clients -= dead
except asyncio.CancelledError:
break
except Exception:
pass
await asyncio.sleep(0.1) # 100 ms tick
async def _tick(self) -> dict:
"""Advance the simulation by one tick and return the broadcast payload."""
microphone_updates: dict = {}
all_states = self.router.get_stream_states()
for mic_id, sim in self._speech_states.items():
state = all_states.get(mic_id, {})
if state.get("is_muted", False):
# Muted – no audio processing needed
microphone_updates[mic_id] = _build_mic_payload(mic_id, state, {})
continue
# Advance the speech / silence FSM
if sim["speaking"]:
sim["ticks_remaining"] -= 1
if sim["ticks_remaining"] <= 0:
sim["speaking"] = False
sim["silence_remaining"] = random.randint(5, 30)
else:
sim["silence_remaining"] -= 1
if sim["silence_remaining"] <= 0:
sim["speaking"] = True
sim["ticks_remaining"] = random.randint(10, 40)
has_speech = sim["speaking"]
db_level = random.uniform(-32.0, -25.0) if has_speech else random.uniform(-70.0, -55.0)
frame = self.router.simulate_audio_frame(
mic_id, db_level=db_level, has_speech=has_speech
)
result = self.router.process_frame(mic_id, frame)
microphone_updates[mic_id] = _build_mic_payload(mic_id, state, result)
return {
"type": "audio_update",
"session_id": self.session_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
"microphones": microphone_updates,
"active_speaker": _find_active_speaker(microphone_updates),
"system_health": _generate_health_metrics(),
}
# ------------------------------------------------------------------
# Command handlers
# ------------------------------------------------------------------
def handle_command(self, command: dict) -> dict:
"""
Dispatch an inbound control command. Returns an ack/error dict.
"""
cmd_type = command.get("type")
mic_id: Optional[str] = command.get("mic_id")
if cmd_type == "mute":
muted: bool = command.get("muted", True)
if not mic_id:
return {"type": "error", "message": "mic_id required for mute"}
self.router.set_microphone_muted(mic_id, muted)
return {"type": "ack", "command": cmd_type, "mic_id": mic_id, "muted": muted}
if cmd_type == "set_priority":
priority = command.get("priority")
if not mic_id or priority is None:
return {"type": "error", "message": "mic_id and priority required"}
try:
priority = int(priority)
except (TypeError, ValueError):
return {"type": "error", "message": "priority must be an integer 1–10"}
self.router.set_microphone_priority(mic_id, priority)
return {"type": "ack", "command": cmd_type, "mic_id": mic_id, "priority": priority}
if cmd_type == "set_chairman":
if not mic_id:
return {"type": "error", "message": "mic_id required for set_chairman"}
self.router.set_chairman(mic_id)
return {"type": "ack", "command": cmd_type, "mic_id": mic_id}
if cmd_type == "add_microphone":
name: str = command.get("name", f"Mic {mic_id}")
priority: int = int(command.get("priority", 5))
is_chairman: bool = bool(command.get("is_chairman", False))
if not mic_id:
return {"type": "error", "message": "mic_id required"}
added = self.router.add_stream(mic_id, name, priority, is_chairman)
if added:
self._speech_states[mic_id] = {
"speaking": False,
"ticks_remaining": 0,
"silence_remaining": random.randint(5, 20),
}
return {"type": "ack", "command": cmd_type, "mic_id": mic_id, "added": added}
if cmd_type == "remove_microphone":
if not mic_id:
return {"type": "error", "message": "mic_id required"}
self.router.remove_stream(mic_id)
self._speech_states.pop(mic_id, None)
return {"type": "ack", "command": cmd_type, "mic_id": mic_id}
if cmd_type == "get_states":
return {
"type": "states",
"microphones": self.router.get_stream_states(),
}
return {"type": "error", "message": f"Unknown command type: {cmd_type!r}"}
# ---------------------------------------------------------------------------
# Session registry (keyed by session_id)
# ---------------------------------------------------------------------------
_sessions: Dict[str, AudioSession] = {}
def _get_or_create_session(session_id: str) -> AudioSession:
if session_id not in _sessions:
_sessions[session_id] = AudioSession(session_id)
session = _sessions[session_id]
session.start()
return session
# ---------------------------------------------------------------------------
# WebSocket endpoint
# ---------------------------------------------------------------------------
@router.websocket("/ws/{session_id}")
async def websocket_endpoint(websocket: WebSocket, session_id: str) -> None:
await websocket.accept()
session = _get_or_create_session(session_id)
session.add_client(websocket)
# Greet the new client with current microphone states
await websocket.send_text(
json.dumps(
{
"type": "connected",
"session_id": session_id,
"microphones": session.router.get_stream_states(),
"message": "Connected to SD-MMMS audio session",
},
default=str,
)
)
try:
while True:
raw = await websocket.receive_text()
try:
command = json.loads(raw)
except json.JSONDecodeError:
await websocket.send_text(
json.dumps({"type": "error", "message": "Invalid JSON"})
)
continue
response = session.handle_command(command)
await websocket.send_text(json.dumps(response, default=str))
except WebSocketDisconnect:
pass
finally:
session.remove_client(websocket)
# Clean up idle sessions (no clients left)
if not session.clients:
session.stop()
_sessions.pop(session_id, None)
# ---------------------------------------------------------------------------
# WebRTC signaling relay (phone-as-microphone)
# ---------------------------------------------------------------------------
#
# A lightweight, stateless relay: two peers (a desktop "receiver" and a phone
# "sender") join a room keyed by a one-time token and forward SDP / ICE
# messages to each other. No media passes through the server.
_signal_rooms: Dict[str, Set[WebSocket]] = {}
async def _signal_send(ws: WebSocket, payload: dict) -> None:
try:
await ws.send_text(json.dumps(payload))
except Exception:
pass
@router.websocket("/ws/signal/{token}")
async def signaling_endpoint(websocket: WebSocket, token: str) -> None:
await websocket.accept()
room = _signal_rooms.setdefault(token, set())
if len(room) >= 2:
# Room already has both peers; reject the third.
await _signal_send(websocket, {"type": "full"})
await websocket.close()
return
room.add(websocket)
# Tell the newcomer how many peers were already present, and notify others.
await _signal_send(websocket, {"type": "joined", "peers": len(room) - 1})
for peer in list(room):
if peer is not websocket:
await _signal_send(peer, {"type": "peer-joined"})
try:
while True:
raw = await websocket.receive_text()
# Relay verbatim to the other peer(s) in the room.
for peer in list(room):
if peer is not websocket:
try:
await peer.send_text(raw)
except Exception:
pass
except WebSocketDisconnect:
pass
finally:
room.discard(websocket)
if room:
for peer in list(room):
await _signal_send(peer, {"type": "peer-left"})
else:
_signal_rooms.pop(token, None)
# ---------------------------------------------------------------------------
# Multi-party room mesh signaling (real participant audio)
# ---------------------------------------------------------------------------
#
# Each participant joins a room and is assigned a peer_id. The server keeps a
# roster per room and relays SDP/ICE messages either to a specific target peer
# (`to`) or broadcasts control events (mic state, meeting ended) to everyone.
# Audio itself flows peer-to-peer over WebRTC; only signaling passes here.
class _RoomPeer:
def __init__(self, ws: WebSocket, uid: str, name: str):
self.ws = ws
self.uid = uid
self.name = name
_rooms: Dict[str, Dict[str, _RoomPeer]] = {}
@router.websocket("/ws/room/{room_id}")
async def room_mesh_endpoint(websocket: WebSocket, room_id: str) -> None:
await websocket.accept()
uid = websocket.query_params.get("uid", "")
name = websocket.query_params.get("name", "")
peer_id = uuid.uuid4().hex
peers = _rooms.setdefault(room_id, {})
# Greet the newcomer with the existing roster, then register & announce.
await _signal_send(
websocket,
{
"type": "welcome",
"peer_id": peer_id,
"peers": [
{"peer_id": pid, "uid": p.uid, "name": p.name}
for pid, p in peers.items()
],
},
)
for p in peers.values():
await _signal_send(
p.ws, {"type": "peer-join", "peer_id": peer_id, "uid": uid, "name": name}
)
peers[peer_id] = _RoomPeer(websocket, uid, name)
try:
while True:
raw = await websocket.receive_text()
try:
msg = json.loads(raw)
except json.JSONDecodeError:
continue
target = msg.get("to")
if target and target in peers:
# Directed SDP/ICE message → forward to the named peer.
msg["from"] = peer_id
await _signal_send(peers[target].ws, msg)
elif msg.get("type") == "broadcast":
# Control event (mic-state / ended) → everyone else in the room.
data = dict(msg.get("data", {}))
data["from"] = peer_id
data["uid"] = uid
for pid, p in list(peers.items()):
if pid != peer_id:
await _signal_send(p.ws, data)
except WebSocketDisconnect:
pass
finally:
peers.pop(peer_id, None)
for p in list(peers.values()):
await _signal_send(p.ws, {"type": "peer-leave", "peer_id": peer_id})
if not peers:
_rooms.pop(room_id, None)
# ---------------------------------------------------------------------------
# Private helpers
# ---------------------------------------------------------------------------
def _build_mic_payload(mic_id: str, state: dict, result: dict) -> dict:
"""Merge ORM state with DSP result into the per-microphone broadcast payload."""
vad = result.get("vad", {})
return {
"mic_id": mic_id,
"name": state.get("name", mic_id),
"db_level": round(vad.get("db_level", state.get("db_level", -100.0)), 2),
"is_speech_active": vad.get("is_speech", state.get("is_speech_active", False)),
"speech_probability": round(
vad.get("speech_probability", state.get("speech_probability", 0.0)), 3
),
"gain": round(result.get("gain", state.get("gain", 1.0)), 3),
"is_muted": state.get("is_muted", False),
"priority": state.get("priority", 5),
"is_chairman": state.get("is_chairman", False),
"suppressed": result.get("suppressed", False),
"zcr": round(vad.get("zcr", 0.0), 4),
"band_energy_ratio": round(vad.get("band_energy_ratio", 0.0), 4),
}
def _find_active_speaker(mic_updates: dict) -> Optional[str]:
"""Return the mic_id of the loudest active (non-suppressed) speaker, or None."""
candidates = [
(mid, data)
for mid, data in mic_updates.items()
if data.get("is_speech_active") and not data.get("suppressed") and not data.get("is_muted")
]
if not candidates:
return None
return max(candidates, key=lambda x: x[1].get("db_level", -100.0))[0]
def _generate_health_metrics() -> dict:
"""Produce realistic-looking system health metrics for the dashboard."""
return {
"cpu_usage": round(random.uniform(8.0, 35.0), 1),
"memory_usage": round(random.uniform(40.0, 70.0), 1),
"latency_ms": round(random.uniform(12.0, 28.0), 1),
"buffer_health": round(random.uniform(92.0, 100.0), 1),
"packet_loss": round(random.uniform(0.0, 0.5), 2),
}