from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.responses import HTMLResponse from pathlib import Path import uvicorn import json import uuid import logging logging.basicConfig(level=logging.INFO) log = logging.getLogger("intercom") app = FastAPI() rooms = {} ICE_SERVERS = [ {"urls": "stun:stun.l.google.com:19302"}, {"urls": "stun:stun1.l.google.com:19302"}, {"urls": "stun:stun2.l.google.com:19302"}, {"urls": "stun:stun3.l.google.com:19302"}, {"urls": "turn:openrelay.metered.ca:80", "username": "openrelayproject", "credential": "openrelayproject"}, {"urls": "turn:openrelay.metered.ca:443", "username": "openrelayproject", "credential": "openrelayproject"}, {"urls": "turn:openrelay.metered.ca:443?transport=tcp", "username": "openrelayproject", "credential": "openrelayproject"}, ] @app.get("/") async def index(): return HTMLResponse(Path("index.html").read_text()) @app.get("/ping") async def ping(): return {"status": "alive"} @app.get("/ice") async def get_ice(): return {"iceServers": ICE_SERVERS} @app.websocket("/ws/{room}") async def ws_endpoint(websocket: WebSocket, room: str): await websocket.accept() pid = uuid.uuid4().hex[:8] if room not in rooms: rooms[room] = {"peers": {}, "broadcaster": None} r = rooms[room] r["peers"][pid] = websocket log.info(f"[+] {pid} joined '{room}' ({len(r['peers'])} peers)") await websocket.send_text(json.dumps({"type": "assigned", "id": pid})) await broadcast_state(room) try: while True: msg = await websocket.receive() if "text" in msg: await handle_text(room, pid, json.loads(msg["text"])) elif "bytes" in msg: raw = msg["bytes"] if len(raw) < 2: continue tag = raw[0] if tag == 2 and r["broadcaster"] != pid: continue await relay_bytes(room, pid, raw) except (WebSocketDisconnect, Exception) as e: log.info(f"[-] {pid} left '{room}': {type(e).__name__}") r["peers"].pop(pid, None) if r["broadcaster"] == pid: r["broadcaster"] = None if not r["peers"]: rooms.pop(room, None) log.info(f"[x] Room '{room}' removed (empty)") else: await notify_leave(room, pid) await broadcast_state(room) async def handle_text(room, sender, data): r = rooms.get(room) if not r: return t = data.get("type") if t == "set_broadcaster": old = r["broadcaster"] r["broadcaster"] = sender log.info(f"[B] {sender} broadcasting in '{room}'") if old and old != sender and old in r["peers"]: try: await r["peers"][old].send_text(json.dumps({"type": "force_viewer"})) except: pass await broadcast_state(room) elif t == "set_viewer": if r["broadcaster"] == sender: r["broadcaster"] = None log.info(f"[V] {sender} stopped broadcasting in '{room}'") await broadcast_state(room) elif t in ("offer", "answer", "ice", "negotiate"): target = data.get("to") if not target: return if target in r["peers"]: fwd = dict(data) fwd["from"] = sender try: await r["peers"][target].send_text(json.dumps(fwd)) log.info(f"[S] {t} from {sender} -> {target}") except Exception as e: log.warning(f"[!] Signal fail {t} {sender}->{target}: {e}") elif t == "p2p_failed": log.info(f"[!] P2P failed for {sender} in '{room}', falling back to relay") async def relay_bytes(room, sender, data): r = rooms.get(room) if not r: return dead = [] for pid, ws in list(r["peers"].items()): if pid != sender: try: await ws.send_bytes(data) except: dead.append(pid) for d in dead: r["peers"].pop(d, None) log.info(f"[-] {d} dead connection removed from '{room}'") async def broadcast_state(room): r = rooms.get(room) if not r: return dead = [] for pid, ws in list(r["peers"].items()): try: await ws.send_text(json.dumps({ "type": "state", "id": pid, "peers": list(r["peers"].keys()), "peer_count": len(r["peers"]), "broadcaster": r["broadcaster"], "is_broadcaster": pid == r["broadcaster"] })) except: dead.append(pid) for d in dead: r["peers"].pop(d, None) async def notify_leave(room, pid): r = rooms.get(room) if not r: return for p, ws in list(r["peers"].items()): try: await ws.send_text(json.dumps({"type": "peer_left", "id": pid})) except: pass if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)