SYNC / main.py
triflix's picture
Create main.py
f2afb8c verified
import os
import uuid
import time
import json
import random
import asyncio
from typing import Dict, Any, Optional
from fastapi import (
FastAPI, Request, WebSocket, WebSocketDisconnect,
UploadFile, File, Form
)
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
# ---------------------------
# Configuration & setup
# ---------------------------
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# Use /tmp/music_sync by default (container-safe). Can be overridden via TMP_DIR env var.
TMP_DIR = os.environ.get("TMP_DIR", "/tmp/music_sync")
os.makedirs(TMP_DIR, exist_ok=True)
# ensure writable
try:
os.chmod(TMP_DIR, 0o777)
except Exception:
pass
app = FastAPI()
templates = Jinja2Templates(directory=os.path.join(BASE_DIR, "templates"))
app.mount("/static", StaticFiles(directory=os.path.join(BASE_DIR, "static")), name="static")
# serve media from the temp dir
app.mount("/media", StaticFiles(directory=TMP_DIR), name="media")
# In-memory rooms registry
rooms: Dict[str, Dict[str, Any]] = {}
rooms_lock = asyncio.Lock()
# ---------------------------
# Helpers
# ---------------------------
def now_ts() -> float:
return time.time()
def gen_room_code() -> str:
while True:
code = f"{random.randint(0, 999999):06d}"
if code not in rooms:
return code
def make_room(room_code: str, creator_name: str) -> Dict[str, Any]:
room = {
"room_code": room_code,
"created_at": now_ts(),
"admin_token": uuid.uuid4().hex,
"tracks": [], # list of track dicts
"playlist": [], # ordered track_ids
"current_track_id": None,
"play_state": "paused", # Changed from stopped
"play_position": 0.0,
"play_started_at": None,
"seq": 0,
"participants": {}, # session_id -> participant dict
"lock": asyncio.Lock()
}
rooms[room_code] = room
return room
def add_track_to_room(room: Dict[str, Any], filename: str, original_name: str, uploader_name: str) -> Dict[str, Any]:
track_id = uuid.uuid4().hex
track = {
"track_id": track_id,
"filename": filename, # relative path under TMP_DIR (e.g. roomcode/file.mp3)
"original_name": original_name,
"url": f"/media/{filename}",
"uploader": uploader_name,
"duration": None
}
room["tracks"].append(track)
room["playlist"].append(track_id)
return track
async def broadcast_to_room(room_code: str, message: Dict[str, Any]) -> None:
room = rooms.get(room_code)
if not room:
return
participants = list(room["participants"].values())
dead_sockets = []
for p in participants:
ws: Optional[WebSocket] = p.get("ws")
if ws:
try:
await ws.send_json(message)
except Exception:
dead_sockets.append(ws)
def get_room_state_payload(room: Dict[str, Any]) -> Dict[str, Any]:
return {
"type": "sync_state",
"seq": room["seq"],
"room_code": room["room_code"],
"tracks": room["tracks"],
"playlist": room["playlist"],
"current_track_id": room["current_track_id"],
"play_state": room["play_state"],
"play_position": room["play_position"],
"play_started_at": room["play_started_at"],
"server_time": now_ts(),
"participants": [p["name"] for p in room["participants"].values()]
}
# ---------------------------
# HTTP endpoints
# ---------------------------
@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.post("/create_room")
async def create_room(name: str = Form(...)):
name = (name or "Guest").strip()[:40]
async with rooms_lock:
room_code = gen_room_code()
room = make_room(room_code, name)
session_id = uuid.uuid4().hex
redirect_url = f"/room/{room_code}?name={name}&session_id={session_id}&admin=true&admin_token={room['admin_token']}"
return RedirectResponse(url=redirect_url, status_code=303)
@app.post("/join_room")
async def join_room(name: str = Form(...), room_code: str = Form(...)):
room_code = (room_code or "").strip()
if room_code not in rooms:
# re-render index with error
return templates.TemplateResponse("index.html", {"request": Request, "error": f"Room {room_code} not found."})
name = (name or "Guest").strip()[:40]
session_id = uuid.uuid4().hex
redirect_url = f"/room/{room_code}?name={name}&session_id={session_id}"
return RedirectResponse(url=redirect_url, status_code=303)
@app.get("/room/{room_code}", response_class=HTMLResponse)
async def room_page(request: Request, room_code: str, name: str = "", session_id: str = "", admin: str = "false", admin_token: str = ""):
room = rooms.get(room_code)
if room is None:
return HTMLResponse(f"<h2>Room {room_code} not found.</h2>", status_code=404)
context = {
"request": request,
"room_code": room_code,
"name": name,
"session_id": session_id,
"is_admin": (admin.lower() == "true"),
"admin_token": admin_token
}
return templates.TemplateResponse("room.html", context)
@app.post("/upload")
async def upload_track(room_code: str = Form(...), session_id: str = Form(...), file: UploadFile = File(...), uploader_name: str = Form(None)):
if room_code not in rooms:
return JSONResponse({"ok": False, "error": "Room not found"}, status_code=404)
room = rooms[room_code]
filename = file.filename or "track.mp3"
# ensure room dir under TMP_DIR
room_dir = os.path.join(TMP_DIR, room_code)
os.makedirs(room_dir, exist_ok=True)
try:
os.chmod(room_dir, 0o777)
except Exception:
pass
ext = os.path.splitext(filename)[1] or ".mp3"
stored_name = f"{uuid.uuid4().hex}{ext}"
stored_path = os.path.join(room_dir, stored_name)
contents = await file.read()
with open(stored_path, "wb") as f:
f.write(contents)
uploader_name = (uploader_name or "anonymous")[:40]
rel_filename = f"{room_code}/{stored_name}"
async with room["lock"]:
track = add_track_to_room(room, rel_filename, filename, uploader_name)
room["seq"] += 1
payload = {
"type": "track_added",
"seq": room["seq"],
"track": track,
"server_time": now_ts()
}
await broadcast_to_room(room_code, payload)
return JSONResponse({"ok": True, "track": track})
# ---------------------------
# WebSocket endpoint
# ---------------------------
@app.websocket("/ws/{room_code}")
async def websocket_endpoint(websocket: WebSocket, room_code: str):
await websocket.accept()
params = websocket.query_params
session_id = params.get("session_id") or uuid.uuid4().hex
name = (params.get("name") or "Guest")[:40]
admin_token = params.get("admin_token")
if room_code not in rooms:
await websocket.send_json({"type": "error", "message": "Room not found"})
await websocket.close()
return
room = rooms[room_code]
is_admin_check = (admin_token is not None and admin_token == room.get("admin_token"))
participant = {
"session_id": session_id,
"name": name,
"is_admin": is_admin_check,
"ws": websocket,
"last_ping_ts": now_ts(),
"joined_at": now_ts()
}
async with room["lock"]:
room["participants"][session_id] = participant
room["seq"] += 1
await broadcast_to_room(room_code, {"type": "user_joined", "name": name, "server_time": now_ts()})
try:
await websocket.send_json({
"type": "joined",
"session_id": session_id,
"is_admin": participant["is_admin"],
"server_time": now_ts(),
"seq": room["seq"]
})
await websocket.send_json(get_room_state_payload(room))
while True:
data = await websocket.receive_text()
try:
msg = json.loads(data)
except Exception:
continue
mtype = msg.get("type")
if mtype == "ping":
client_time = msg.get("client_time")
await websocket.send_json({"type": "pong", "client_time": client_time, "server_time": now_ts()})
participant["last_ping_ts"] = now_ts()
continue
if mtype == "request_sync":
await websocket.send_json(get_room_state_payload(room))
continue
is_admin = participant.get("is_admin", False)
if mtype == "play":
if not is_admin: continue
pos = float(msg.get("position", 0.0))
BUFFER = 0.45
play_at = now_ts() + BUFFER
async with room["lock"]:
room["play_state"] = "playing"
room["current_track_id"] = room.get("current_track_id") or (room["playlist"][0] if room["playlist"] else None)
room["play_position"] = pos
room["play_started_at"] = play_at - pos
room["seq"] += 1
seq = room["seq"]
payload = {"type": "play", "seq": seq, "play_at": play_at, "position": pos, "track_id": room["current_track_id"], "server_time": now_ts()}
await broadcast_to_room(room_code, payload)
continue
if mtype == "pause":
if not is_admin: continue
pos = float(msg.get("position", 0.0))
async with room["lock"]:
room["play_state"] = "paused"
room["play_position"] = pos
room["play_started_at"] = None
room["seq"] += 1
seq = room["seq"]
payload = {"type": "pause", "seq": seq, "position": pos, "track_id": room["current_track_id"], "server_time": now_ts()}
await broadcast_to_room(room_code, payload)
continue
if mtype == "seek":
if not is_admin: continue
pos = float(msg.get("position", 0.0))
async with room["lock"]:
room["play_position"] = pos
if room["play_state"] == "playing":
room["play_started_at"] = now_ts() - pos
room["seq"] += 1
seq = room["seq"]
payload = {"type": "seek", "seq": seq, "position": pos, "track_id": room["current_track_id"], "server_time": now_ts()}
await broadcast_to_room(room_code, payload)
continue
if mtype == "set_track":
if not is_admin: continue
track_id = msg.get("track_id")
exists = any(t["track_id"] == track_id for t in room["tracks"])
if not exists: continue
async with room["lock"]:
room["current_track_id"] = track_id
room["play_state"] = "paused"
room["play_position"] = 0.0
room["play_started_at"] = None
room["seq"] += 1
seq = room["seq"]
payload = {"type": "set_track", "seq": seq, "track_id": track_id, "server_time": now_ts()}
await broadcast_to_room(room_code, payload)
continue
except WebSocketDisconnect:
pass # Handled in finally
except Exception as e:
print(f"Error in websocket: {e}")
finally:
async with room["lock"]:
room["participants"].pop(session_id, None)
await broadcast_to_room(room_code, {"type": "user_left", "name": name, "server_time": now_ts()})