import os import uuid import time import json import random import asyncio from typing import Dict, Any, Optional from fastapi import ( FastAPI, Request, WebSocket, WebSocketDisconnect, UploadFile, File, Form ) from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates # --------------------------- # Configuration & setup # --------------------------- BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # Use /tmp/music_sync by default (container-safe). Can be overridden via TMP_DIR env var. TMP_DIR = os.environ.get("TMP_DIR", "/tmp/music_sync") os.makedirs(TMP_DIR, exist_ok=True) # ensure writable try: os.chmod(TMP_DIR, 0o777) except Exception: pass app = FastAPI() templates = Jinja2Templates(directory=os.path.join(BASE_DIR, "templates")) app.mount("/static", StaticFiles(directory=os.path.join(BASE_DIR, "static")), name="static") # serve media from the temp dir app.mount("/media", StaticFiles(directory=TMP_DIR), name="media") # In-memory rooms registry rooms: Dict[str, Dict[str, Any]] = {} rooms_lock = asyncio.Lock() # --------------------------- # Helpers # --------------------------- def now_ts() -> float: return time.time() def gen_room_code() -> str: while True: code = f"{random.randint(0, 999999):06d}" if code not in rooms: return code def make_room(room_code: str, creator_name: str) -> Dict[str, Any]: room = { "room_code": room_code, "created_at": now_ts(), "admin_token": uuid.uuid4().hex, "tracks": [], # list of track dicts "playlist": [], # ordered track_ids "current_track_id": None, "play_state": "paused", # Changed from stopped "play_position": 0.0, "play_started_at": None, "seq": 0, "participants": {}, # session_id -> participant dict "lock": asyncio.Lock() } rooms[room_code] = room return room def add_track_to_room(room: Dict[str, Any], filename: str, original_name: str, uploader_name: str) -> Dict[str, Any]: track_id = uuid.uuid4().hex track = { "track_id": track_id, "filename": filename, # relative path under TMP_DIR (e.g. roomcode/file.mp3) "original_name": original_name, "url": f"/media/{filename}", "uploader": uploader_name, "duration": None } room["tracks"].append(track) room["playlist"].append(track_id) return track async def broadcast_to_room(room_code: str, message: Dict[str, Any]) -> None: room = rooms.get(room_code) if not room: return participants = list(room["participants"].values()) dead_sockets = [] for p in participants: ws: Optional[WebSocket] = p.get("ws") if ws: try: await ws.send_json(message) except Exception: dead_sockets.append(ws) def get_room_state_payload(room: Dict[str, Any]) -> Dict[str, Any]: return { "type": "sync_state", "seq": room["seq"], "room_code": room["room_code"], "tracks": room["tracks"], "playlist": room["playlist"], "current_track_id": room["current_track_id"], "play_state": room["play_state"], "play_position": room["play_position"], "play_started_at": room["play_started_at"], "server_time": now_ts(), "participants": [p["name"] for p in room["participants"].values()] } # --------------------------- # HTTP endpoints # --------------------------- @app.get("/", response_class=HTMLResponse) async def index(request: Request): return templates.TemplateResponse("index.html", {"request": request}) @app.post("/create_room") async def create_room(name: str = Form(...)): name = (name or "Guest").strip()[:40] async with rooms_lock: room_code = gen_room_code() room = make_room(room_code, name) session_id = uuid.uuid4().hex redirect_url = f"/room/{room_code}?name={name}&session_id={session_id}&admin=true&admin_token={room['admin_token']}" return RedirectResponse(url=redirect_url, status_code=303) @app.post("/join_room") async def join_room(name: str = Form(...), room_code: str = Form(...)): room_code = (room_code or "").strip() if room_code not in rooms: # re-render index with error return templates.TemplateResponse("index.html", {"request": Request, "error": f"Room {room_code} not found."}) name = (name or "Guest").strip()[:40] session_id = uuid.uuid4().hex redirect_url = f"/room/{room_code}?name={name}&session_id={session_id}" return RedirectResponse(url=redirect_url, status_code=303) @app.get("/room/{room_code}", response_class=HTMLResponse) async def room_page(request: Request, room_code: str, name: str = "", session_id: str = "", admin: str = "false", admin_token: str = ""): room = rooms.get(room_code) if room is None: return HTMLResponse(f"

Room {room_code} not found.

", status_code=404) context = { "request": request, "room_code": room_code, "name": name, "session_id": session_id, "is_admin": (admin.lower() == "true"), "admin_token": admin_token } return templates.TemplateResponse("room.html", context) @app.post("/upload") async def upload_track(room_code: str = Form(...), session_id: str = Form(...), file: UploadFile = File(...), uploader_name: str = Form(None)): if room_code not in rooms: return JSONResponse({"ok": False, "error": "Room not found"}, status_code=404) room = rooms[room_code] filename = file.filename or "track.mp3" # ensure room dir under TMP_DIR room_dir = os.path.join(TMP_DIR, room_code) os.makedirs(room_dir, exist_ok=True) try: os.chmod(room_dir, 0o777) except Exception: pass ext = os.path.splitext(filename)[1] or ".mp3" stored_name = f"{uuid.uuid4().hex}{ext}" stored_path = os.path.join(room_dir, stored_name) contents = await file.read() with open(stored_path, "wb") as f: f.write(contents) uploader_name = (uploader_name or "anonymous")[:40] rel_filename = f"{room_code}/{stored_name}" async with room["lock"]: track = add_track_to_room(room, rel_filename, filename, uploader_name) room["seq"] += 1 payload = { "type": "track_added", "seq": room["seq"], "track": track, "server_time": now_ts() } await broadcast_to_room(room_code, payload) return JSONResponse({"ok": True, "track": track}) # --------------------------- # WebSocket endpoint # --------------------------- @app.websocket("/ws/{room_code}") async def websocket_endpoint(websocket: WebSocket, room_code: str): await websocket.accept() params = websocket.query_params session_id = params.get("session_id") or uuid.uuid4().hex name = (params.get("name") or "Guest")[:40] admin_token = params.get("admin_token") if room_code not in rooms: await websocket.send_json({"type": "error", "message": "Room not found"}) await websocket.close() return room = rooms[room_code] is_admin_check = (admin_token is not None and admin_token == room.get("admin_token")) participant = { "session_id": session_id, "name": name, "is_admin": is_admin_check, "ws": websocket, "last_ping_ts": now_ts(), "joined_at": now_ts() } async with room["lock"]: room["participants"][session_id] = participant room["seq"] += 1 await broadcast_to_room(room_code, {"type": "user_joined", "name": name, "server_time": now_ts()}) try: await websocket.send_json({ "type": "joined", "session_id": session_id, "is_admin": participant["is_admin"], "server_time": now_ts(), "seq": room["seq"] }) await websocket.send_json(get_room_state_payload(room)) while True: data = await websocket.receive_text() try: msg = json.loads(data) except Exception: continue mtype = msg.get("type") if mtype == "ping": client_time = msg.get("client_time") await websocket.send_json({"type": "pong", "client_time": client_time, "server_time": now_ts()}) participant["last_ping_ts"] = now_ts() continue if mtype == "request_sync": await websocket.send_json(get_room_state_payload(room)) continue is_admin = participant.get("is_admin", False) if mtype == "play": if not is_admin: continue pos = float(msg.get("position", 0.0)) BUFFER = 0.45 play_at = now_ts() + BUFFER async with room["lock"]: room["play_state"] = "playing" room["current_track_id"] = room.get("current_track_id") or (room["playlist"][0] if room["playlist"] else None) room["play_position"] = pos room["play_started_at"] = play_at - pos room["seq"] += 1 seq = room["seq"] payload = {"type": "play", "seq": seq, "play_at": play_at, "position": pos, "track_id": room["current_track_id"], "server_time": now_ts()} await broadcast_to_room(room_code, payload) continue if mtype == "pause": if not is_admin: continue pos = float(msg.get("position", 0.0)) async with room["lock"]: room["play_state"] = "paused" room["play_position"] = pos room["play_started_at"] = None room["seq"] += 1 seq = room["seq"] payload = {"type": "pause", "seq": seq, "position": pos, "track_id": room["current_track_id"], "server_time": now_ts()} await broadcast_to_room(room_code, payload) continue if mtype == "seek": if not is_admin: continue pos = float(msg.get("position", 0.0)) async with room["lock"]: room["play_position"] = pos if room["play_state"] == "playing": room["play_started_at"] = now_ts() - pos room["seq"] += 1 seq = room["seq"] payload = {"type": "seek", "seq": seq, "position": pos, "track_id": room["current_track_id"], "server_time": now_ts()} await broadcast_to_room(room_code, payload) continue if mtype == "set_track": if not is_admin: continue track_id = msg.get("track_id") exists = any(t["track_id"] == track_id for t in room["tracks"]) if not exists: continue async with room["lock"]: room["current_track_id"] = track_id room["play_state"] = "paused" room["play_position"] = 0.0 room["play_started_at"] = None room["seq"] += 1 seq = room["seq"] payload = {"type": "set_track", "seq": seq, "track_id": track_id, "server_time": now_ts()} await broadcast_to_room(room_code, payload) continue except WebSocketDisconnect: pass # Handled in finally except Exception as e: print(f"Error in websocket: {e}") finally: async with room["lock"]: room["participants"].pop(session_id, None) await broadcast_to_room(room_code, {"type": "user_left", "name": name, "server_time": now_ts()})