phone-stream-e2e / main.py
kylsprt's picture
Update main.py
aa34189 verified
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from pathlib import Path
import uvicorn
import json
import uuid
import logging
logging.basicConfig(level=logging.INFO)
log = logging.getLogger("intercom")
app = FastAPI()
rooms = {}
ICE_SERVERS = [
{"urls": "stun:stun.l.google.com:19302"},
{"urls": "stun:stun1.l.google.com:19302"},
{"urls": "stun:stun2.l.google.com:19302"},
{"urls": "stun:stun3.l.google.com:19302"},
{"urls": "turn:openrelay.metered.ca:80", "username": "openrelayproject", "credential": "openrelayproject"},
{"urls": "turn:openrelay.metered.ca:443", "username": "openrelayproject", "credential": "openrelayproject"},
{"urls": "turn:openrelay.metered.ca:443?transport=tcp", "username": "openrelayproject", "credential": "openrelayproject"},
]
@app.get("/")
async def index():
return HTMLResponse(Path("index.html").read_text())
@app.get("/ping")
async def ping():
return {"status": "alive"}
@app.get("/ice")
async def get_ice():
return {"iceServers": ICE_SERVERS}
@app.websocket("/ws/{room}")
async def ws_endpoint(websocket: WebSocket, room: str):
await websocket.accept()
pid = uuid.uuid4().hex[:8]
if room not in rooms:
rooms[room] = {"peers": {}, "broadcaster": None}
r = rooms[room]
r["peers"][pid] = websocket
log.info(f"[+] {pid} joined '{room}' ({len(r['peers'])} peers)")
await websocket.send_text(json.dumps({"type": "assigned", "id": pid}))
await broadcast_state(room)
try:
while True:
msg = await websocket.receive()
if "text" in msg:
await handle_text(room, pid, json.loads(msg["text"]))
elif "bytes" in msg:
raw = msg["bytes"]
if len(raw) < 2:
continue
tag = raw[0]
if tag == 2 and r["broadcaster"] != pid:
continue
await relay_bytes(room, pid, raw)
except (WebSocketDisconnect, Exception) as e:
log.info(f"[-] {pid} left '{room}': {type(e).__name__}")
r["peers"].pop(pid, None)
if r["broadcaster"] == pid:
r["broadcaster"] = None
if not r["peers"]:
rooms.pop(room, None)
log.info(f"[x] Room '{room}' removed (empty)")
else:
await notify_leave(room, pid)
await broadcast_state(room)
async def handle_text(room, sender, data):
r = rooms.get(room)
if not r:
return
t = data.get("type")
if t == "set_broadcaster":
old = r["broadcaster"]
r["broadcaster"] = sender
log.info(f"[B] {sender} broadcasting in '{room}'")
if old and old != sender and old in r["peers"]:
try:
await r["peers"][old].send_text(json.dumps({"type": "force_viewer"}))
except:
pass
await broadcast_state(room)
elif t == "set_viewer":
if r["broadcaster"] == sender:
r["broadcaster"] = None
log.info(f"[V] {sender} stopped broadcasting in '{room}'")
await broadcast_state(room)
elif t in ("offer", "answer", "ice", "negotiate"):
target = data.get("to")
if not target:
return
if target in r["peers"]:
fwd = dict(data)
fwd["from"] = sender
try:
await r["peers"][target].send_text(json.dumps(fwd))
log.info(f"[S] {t} from {sender} -> {target}")
except Exception as e:
log.warning(f"[!] Signal fail {t} {sender}->{target}: {e}")
elif t == "p2p_failed":
log.info(f"[!] P2P failed for {sender} in '{room}', falling back to relay")
async def relay_bytes(room, sender, data):
r = rooms.get(room)
if not r:
return
dead = []
for pid, ws in list(r["peers"].items()):
if pid != sender:
try:
await ws.send_bytes(data)
except:
dead.append(pid)
for d in dead:
r["peers"].pop(d, None)
log.info(f"[-] {d} dead connection removed from '{room}'")
async def broadcast_state(room):
r = rooms.get(room)
if not r:
return
dead = []
for pid, ws in list(r["peers"].items()):
try:
await ws.send_text(json.dumps({
"type": "state",
"id": pid,
"peers": list(r["peers"].keys()),
"peer_count": len(r["peers"]),
"broadcaster": r["broadcaster"],
"is_broadcaster": pid == r["broadcaster"]
}))
except:
dead.append(pid)
for d in dead:
r["peers"].pop(d, None)
async def notify_leave(room, pid):
r = rooms.get(room)
if not r:
return
for p, ws in list(r["peers"].items()):
try:
await ws.send_text(json.dumps({"type": "peer_left", "id": pid}))
except:
pass
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)