import json import secrets import hashlib from datetime import datetime, timezone from typing import Dict from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, Request from fastapi.responses import HTMLResponse from fastapi.templating import Jinja2Templates from pydantic import BaseModel app = FastAPI(title="Private Chat") templates = Jinja2Templates(directory="templates") users: Dict[str, dict] = {} connections: Dict[str, WebSocket] = {} def generate_token() -> str: return secrets.token_urlsafe(48) def room_id(a: str, b: str) -> str: return hashlib.sha256("".join(sorted([a, b])).encode()).hexdigest()[:16] class JoinRequest(BaseModel): display_name: str @app.get("/", response_class=HTMLResponse) async def index(request: Request): return templates.TemplateResponse("index.html", {"request": request}) @app.post("/api/join") async def join(body: JoinRequest): name = body.display_name.strip() if not name or len(name) > 32: raise HTTPException(400, "Display name must be 1-32 characters") uid = generate_token() users[uid] = {"name": name, "online": True, "joined": datetime.now(timezone.utc).isoformat()} return {"user_id": uid, "display_name": name} @app.get("/api/users/{user_id}") async def list_users(user_id: str): if user_id not in users: raise HTTPException(401, "Invalid session") return { "users": [ {"id": uid, "name": u["name"]} for uid, u in users.items() if u["online"] and uid != user_id ] } @app.post("/api/rename") async def rename(data: dict): uid = data.get("user_id", "") name = data.get("display_name", "").strip() if uid not in users: raise HTTPException(401, "Invalid session") if not name or len(name) > 32: raise HTTPException(400, "Display name must be 1-32 characters") old = users[uid]["name"] users[uid]["name"] = name return {"display_name": name, "previous_name": old} @app.websocket("/ws/{user_id}") async def ws(websocket: WebSocket, user_id: str): if user_id not in users: await websocket.close(code=4001) return await websocket.accept() connections[user_id] = websocket users[user_id]["online"] = True for cid, conn in connections.items(): if cid != user_id: try: await conn.send_text(json.dumps({ "type": "user_online", "id": user_id, "name": users[user_id]["name"] })) except Exception: pass try: while True: raw = await websocket.receive_text() msg = json.loads(raw) t = msg.get("type") if t == "message": target = msg.get("target_id", "") content = msg.get("content", "").strip() if not content or len(content) > 4000: continue if target not in users: await websocket.send_text(json.dumps({"type": "error", "msg": "User not found"})) continue rid = room_id(user_id, target) payload = json.dumps({ "type": "message", "from_id": user_id, "from_name": users[user_id]["name"], "content": content, "room_id": rid, "ts": datetime.now(timezone.utc).isoformat() }) if target in connections: try: await connections[target].send_text(payload) except Exception: pass await websocket.send_text(payload) elif t == "typing": target = msg.get("target_id", "") if target in connections: try: await connections[target].send_text(json.dumps({ "type": "typing", "from_id": user_id, "from_name": users[user_id]["name"] })) except Exception: pass elif t == "rename": new_name = msg.get("name", "").strip() if new_name and len(new_name) <= 32: users[user_id]["name"] = new_name for cid, conn in connections.items(): if cid != user_id: try: await conn.send_text(json.dumps({ "type": "user_renamed", "id": user_id, "name": new_name })) except Exception: pass except WebSocketDisconnect: pass finally: connections.pop(user_id, None) if user_id in users: users[user_id]["online"] = False for cid, conn in connections.items(): try: await conn.send_text(json.dumps({ "type": "user_offline", "id": user_id })) except Exception: pass