|
|
import os |
|
|
import uuid |
|
|
import time |
|
|
import json |
|
|
import random |
|
|
import asyncio |
|
|
from typing import Dict, Any, Optional |
|
|
|
|
|
from fastapi import ( |
|
|
FastAPI, Request, WebSocket, WebSocketDisconnect, |
|
|
UploadFile, File, Form |
|
|
) |
|
|
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse |
|
|
from fastapi.staticfiles import StaticFiles |
|
|
from fastapi.templating import Jinja2Templates |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__)) |
|
|
|
|
|
|
|
|
TMP_DIR = os.environ.get("TMP_DIR", "/tmp/music_sync") |
|
|
os.makedirs(TMP_DIR, exist_ok=True) |
|
|
|
|
|
try: |
|
|
os.chmod(TMP_DIR, 0o777) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
app = FastAPI() |
|
|
templates = Jinja2Templates(directory=os.path.join(BASE_DIR, "templates")) |
|
|
app.mount("/static", StaticFiles(directory=os.path.join(BASE_DIR, "static")), name="static") |
|
|
|
|
|
app.mount("/media", StaticFiles(directory=TMP_DIR), name="media") |
|
|
|
|
|
|
|
|
rooms: Dict[str, Dict[str, Any]] = {} |
|
|
rooms_lock = asyncio.Lock() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def now_ts() -> float: |
|
|
return time.time() |
|
|
|
|
|
def gen_room_code() -> str: |
|
|
while True: |
|
|
code = f"{random.randint(0, 999999):06d}" |
|
|
if code not in rooms: |
|
|
return code |
|
|
|
|
|
def make_room(room_code: str, creator_name: str) -> Dict[str, Any]: |
|
|
room = { |
|
|
"room_code": room_code, |
|
|
"created_at": now_ts(), |
|
|
"admin_token": uuid.uuid4().hex, |
|
|
"tracks": [], |
|
|
"playlist": [], |
|
|
"current_track_id": None, |
|
|
"play_state": "paused", |
|
|
"play_position": 0.0, |
|
|
"play_started_at": None, |
|
|
"seq": 0, |
|
|
"participants": {}, |
|
|
"lock": asyncio.Lock() |
|
|
} |
|
|
rooms[room_code] = room |
|
|
return room |
|
|
|
|
|
def add_track_to_room(room: Dict[str, Any], filename: str, original_name: str, uploader_name: str) -> Dict[str, Any]: |
|
|
track_id = uuid.uuid4().hex |
|
|
track = { |
|
|
"track_id": track_id, |
|
|
"filename": filename, |
|
|
"original_name": original_name, |
|
|
"url": f"/media/{filename}", |
|
|
"uploader": uploader_name, |
|
|
"duration": None |
|
|
} |
|
|
room["tracks"].append(track) |
|
|
room["playlist"].append(track_id) |
|
|
return track |
|
|
|
|
|
async def broadcast_to_room(room_code: str, message: Dict[str, Any]) -> None: |
|
|
room = rooms.get(room_code) |
|
|
if not room: |
|
|
return |
|
|
participants = list(room["participants"].values()) |
|
|
dead_sockets = [] |
|
|
for p in participants: |
|
|
ws: Optional[WebSocket] = p.get("ws") |
|
|
if ws: |
|
|
try: |
|
|
await ws.send_json(message) |
|
|
except Exception: |
|
|
dead_sockets.append(ws) |
|
|
|
|
|
def get_room_state_payload(room: Dict[str, Any]) -> Dict[str, Any]: |
|
|
return { |
|
|
"type": "sync_state", |
|
|
"seq": room["seq"], |
|
|
"room_code": room["room_code"], |
|
|
"tracks": room["tracks"], |
|
|
"playlist": room["playlist"], |
|
|
"current_track_id": room["current_track_id"], |
|
|
"play_state": room["play_state"], |
|
|
"play_position": room["play_position"], |
|
|
"play_started_at": room["play_started_at"], |
|
|
"server_time": now_ts(), |
|
|
"participants": [p["name"] for p in room["participants"].values()] |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/", response_class=HTMLResponse) |
|
|
async def index(request: Request): |
|
|
return templates.TemplateResponse("index.html", {"request": request}) |
|
|
|
|
|
@app.post("/create_room") |
|
|
async def create_room(name: str = Form(...)): |
|
|
name = (name or "Guest").strip()[:40] |
|
|
async with rooms_lock: |
|
|
room_code = gen_room_code() |
|
|
room = make_room(room_code, name) |
|
|
session_id = uuid.uuid4().hex |
|
|
redirect_url = f"/room/{room_code}?name={name}&session_id={session_id}&admin=true&admin_token={room['admin_token']}" |
|
|
return RedirectResponse(url=redirect_url, status_code=303) |
|
|
|
|
|
@app.post("/join_room") |
|
|
async def join_room(name: str = Form(...), room_code: str = Form(...)): |
|
|
room_code = (room_code or "").strip() |
|
|
if room_code not in rooms: |
|
|
|
|
|
return templates.TemplateResponse("index.html", {"request": Request, "error": f"Room {room_code} not found."}) |
|
|
name = (name or "Guest").strip()[:40] |
|
|
session_id = uuid.uuid4().hex |
|
|
redirect_url = f"/room/{room_code}?name={name}&session_id={session_id}" |
|
|
return RedirectResponse(url=redirect_url, status_code=303) |
|
|
|
|
|
@app.get("/room/{room_code}", response_class=HTMLResponse) |
|
|
async def room_page(request: Request, room_code: str, name: str = "", session_id: str = "", admin: str = "false", admin_token: str = ""): |
|
|
room = rooms.get(room_code) |
|
|
if room is None: |
|
|
return HTMLResponse(f"<h2>Room {room_code} not found.</h2>", status_code=404) |
|
|
context = { |
|
|
"request": request, |
|
|
"room_code": room_code, |
|
|
"name": name, |
|
|
"session_id": session_id, |
|
|
"is_admin": (admin.lower() == "true"), |
|
|
"admin_token": admin_token |
|
|
} |
|
|
return templates.TemplateResponse("room.html", context) |
|
|
|
|
|
@app.post("/upload") |
|
|
async def upload_track(room_code: str = Form(...), session_id: str = Form(...), file: UploadFile = File(...), uploader_name: str = Form(None)): |
|
|
if room_code not in rooms: |
|
|
return JSONResponse({"ok": False, "error": "Room not found"}, status_code=404) |
|
|
room = rooms[room_code] |
|
|
filename = file.filename or "track.mp3" |
|
|
|
|
|
room_dir = os.path.join(TMP_DIR, room_code) |
|
|
os.makedirs(room_dir, exist_ok=True) |
|
|
try: |
|
|
os.chmod(room_dir, 0o777) |
|
|
except Exception: |
|
|
pass |
|
|
ext = os.path.splitext(filename)[1] or ".mp3" |
|
|
stored_name = f"{uuid.uuid4().hex}{ext}" |
|
|
stored_path = os.path.join(room_dir, stored_name) |
|
|
contents = await file.read() |
|
|
with open(stored_path, "wb") as f: |
|
|
f.write(contents) |
|
|
uploader_name = (uploader_name or "anonymous")[:40] |
|
|
rel_filename = f"{room_code}/{stored_name}" |
|
|
async with room["lock"]: |
|
|
track = add_track_to_room(room, rel_filename, filename, uploader_name) |
|
|
room["seq"] += 1 |
|
|
payload = { |
|
|
"type": "track_added", |
|
|
"seq": room["seq"], |
|
|
"track": track, |
|
|
"server_time": now_ts() |
|
|
} |
|
|
await broadcast_to_room(room_code, payload) |
|
|
return JSONResponse({"ok": True, "track": track}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.websocket("/ws/{room_code}") |
|
|
async def websocket_endpoint(websocket: WebSocket, room_code: str): |
|
|
await websocket.accept() |
|
|
params = websocket.query_params |
|
|
session_id = params.get("session_id") or uuid.uuid4().hex |
|
|
name = (params.get("name") or "Guest")[:40] |
|
|
admin_token = params.get("admin_token") |
|
|
|
|
|
if room_code not in rooms: |
|
|
await websocket.send_json({"type": "error", "message": "Room not found"}) |
|
|
await websocket.close() |
|
|
return |
|
|
|
|
|
room = rooms[room_code] |
|
|
is_admin_check = (admin_token is not None and admin_token == room.get("admin_token")) |
|
|
|
|
|
participant = { |
|
|
"session_id": session_id, |
|
|
"name": name, |
|
|
"is_admin": is_admin_check, |
|
|
"ws": websocket, |
|
|
"last_ping_ts": now_ts(), |
|
|
"joined_at": now_ts() |
|
|
} |
|
|
|
|
|
async with room["lock"]: |
|
|
room["participants"][session_id] = participant |
|
|
room["seq"] += 1 |
|
|
|
|
|
await broadcast_to_room(room_code, {"type": "user_joined", "name": name, "server_time": now_ts()}) |
|
|
|
|
|
try: |
|
|
await websocket.send_json({ |
|
|
"type": "joined", |
|
|
"session_id": session_id, |
|
|
"is_admin": participant["is_admin"], |
|
|
"server_time": now_ts(), |
|
|
"seq": room["seq"] |
|
|
}) |
|
|
await websocket.send_json(get_room_state_payload(room)) |
|
|
|
|
|
while True: |
|
|
data = await websocket.receive_text() |
|
|
try: |
|
|
msg = json.loads(data) |
|
|
except Exception: |
|
|
continue |
|
|
|
|
|
mtype = msg.get("type") |
|
|
if mtype == "ping": |
|
|
client_time = msg.get("client_time") |
|
|
await websocket.send_json({"type": "pong", "client_time": client_time, "server_time": now_ts()}) |
|
|
participant["last_ping_ts"] = now_ts() |
|
|
continue |
|
|
|
|
|
if mtype == "request_sync": |
|
|
await websocket.send_json(get_room_state_payload(room)) |
|
|
continue |
|
|
|
|
|
is_admin = participant.get("is_admin", False) |
|
|
|
|
|
if mtype == "play": |
|
|
if not is_admin: continue |
|
|
pos = float(msg.get("position", 0.0)) |
|
|
BUFFER = 0.45 |
|
|
play_at = now_ts() + BUFFER |
|
|
async with room["lock"]: |
|
|
room["play_state"] = "playing" |
|
|
room["current_track_id"] = room.get("current_track_id") or (room["playlist"][0] if room["playlist"] else None) |
|
|
room["play_position"] = pos |
|
|
room["play_started_at"] = play_at - pos |
|
|
room["seq"] += 1 |
|
|
seq = room["seq"] |
|
|
payload = {"type": "play", "seq": seq, "play_at": play_at, "position": pos, "track_id": room["current_track_id"], "server_time": now_ts()} |
|
|
await broadcast_to_room(room_code, payload) |
|
|
continue |
|
|
|
|
|
if mtype == "pause": |
|
|
if not is_admin: continue |
|
|
pos = float(msg.get("position", 0.0)) |
|
|
async with room["lock"]: |
|
|
room["play_state"] = "paused" |
|
|
room["play_position"] = pos |
|
|
room["play_started_at"] = None |
|
|
room["seq"] += 1 |
|
|
seq = room["seq"] |
|
|
payload = {"type": "pause", "seq": seq, "position": pos, "track_id": room["current_track_id"], "server_time": now_ts()} |
|
|
await broadcast_to_room(room_code, payload) |
|
|
continue |
|
|
|
|
|
if mtype == "seek": |
|
|
if not is_admin: continue |
|
|
pos = float(msg.get("position", 0.0)) |
|
|
async with room["lock"]: |
|
|
room["play_position"] = pos |
|
|
if room["play_state"] == "playing": |
|
|
room["play_started_at"] = now_ts() - pos |
|
|
room["seq"] += 1 |
|
|
seq = room["seq"] |
|
|
payload = {"type": "seek", "seq": seq, "position": pos, "track_id": room["current_track_id"], "server_time": now_ts()} |
|
|
await broadcast_to_room(room_code, payload) |
|
|
continue |
|
|
|
|
|
if mtype == "set_track": |
|
|
if not is_admin: continue |
|
|
track_id = msg.get("track_id") |
|
|
exists = any(t["track_id"] == track_id for t in room["tracks"]) |
|
|
if not exists: continue |
|
|
async with room["lock"]: |
|
|
room["current_track_id"] = track_id |
|
|
room["play_state"] = "paused" |
|
|
room["play_position"] = 0.0 |
|
|
room["play_started_at"] = None |
|
|
room["seq"] += 1 |
|
|
seq = room["seq"] |
|
|
payload = {"type": "set_track", "seq": seq, "track_id": track_id, "server_time": now_ts()} |
|
|
await broadcast_to_room(room_code, payload) |
|
|
continue |
|
|
|
|
|
except WebSocketDisconnect: |
|
|
pass |
|
|
except Exception as e: |
|
|
print(f"Error in websocket: {e}") |
|
|
finally: |
|
|
async with room["lock"]: |
|
|
room["participants"].pop(session_id, None) |
|
|
await broadcast_to_room(room_code, {"type": "user_left", "name": name, "server_time": now_ts()}) |