| import json |
| import secrets |
| import hashlib |
| from datetime import datetime, timezone |
| from typing import Dict |
|
|
| from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, Request |
| from fastapi.responses import HTMLResponse |
| from fastapi.templating import Jinja2Templates |
| from pydantic import BaseModel |
|
|
| app = FastAPI(title="Private Chat") |
| templates = Jinja2Templates(directory="templates") |
|
|
| users: Dict[str, dict] = {} |
| connections: Dict[str, WebSocket] = {} |
|
|
| def generate_token() -> str: |
| return secrets.token_urlsafe(48) |
|
|
| def room_id(a: str, b: str) -> str: |
| return hashlib.sha256("".join(sorted([a, b])).encode()).hexdigest()[:16] |
|
|
| class JoinRequest(BaseModel): |
| display_name: str |
|
|
| @app.get("/", response_class=HTMLResponse) |
| async def index(request: Request): |
| return templates.TemplateResponse("index.html", {"request": request}) |
|
|
| @app.post("/api/join") |
| async def join(body: JoinRequest): |
| name = body.display_name.strip() |
| if not name or len(name) > 32: |
| raise HTTPException(400, "Display name must be 1-32 characters") |
| uid = generate_token() |
| users[uid] = {"name": name, "online": True, "joined": datetime.now(timezone.utc).isoformat()} |
| return {"user_id": uid, "display_name": name} |
|
|
| @app.get("/api/users/{user_id}") |
| async def list_users(user_id: str): |
| if user_id not in users: |
| raise HTTPException(401, "Invalid session") |
| return { |
| "users": [ |
| {"id": uid, "name": u["name"]} |
| for uid, u in users.items() if u["online"] and uid != user_id |
| ] |
| } |
|
|
| @app.post("/api/rename") |
| async def rename(data: dict): |
| uid = data.get("user_id", "") |
| name = data.get("display_name", "").strip() |
| if uid not in users: |
| raise HTTPException(401, "Invalid session") |
| if not name or len(name) > 32: |
| raise HTTPException(400, "Display name must be 1-32 characters") |
| old = users[uid]["name"] |
| users[uid]["name"] = name |
| return {"display_name": name, "previous_name": old} |
|
|
| @app.websocket("/ws/{user_id}") |
| async def ws(websocket: WebSocket, user_id: str): |
| if user_id not in users: |
| await websocket.close(code=4001) |
| return |
| await websocket.accept() |
| connections[user_id] = websocket |
| users[user_id]["online"] = True |
|
|
| for cid, conn in connections.items(): |
| if cid != user_id: |
| try: |
| await conn.send_text(json.dumps({ |
| "type": "user_online", "id": user_id, "name": users[user_id]["name"] |
| })) |
| except Exception: |
| pass |
|
|
| try: |
| while True: |
| raw = await websocket.receive_text() |
| msg = json.loads(raw) |
| t = msg.get("type") |
|
|
| if t == "message": |
| target = msg.get("target_id", "") |
| content = msg.get("content", "").strip() |
| if not content or len(content) > 4000: |
| continue |
| if target not in users: |
| await websocket.send_text(json.dumps({"type": "error", "msg": "User not found"})) |
| continue |
|
|
| rid = room_id(user_id, target) |
| payload = json.dumps({ |
| "type": "message", |
| "from_id": user_id, |
| "from_name": users[user_id]["name"], |
| "content": content, |
| "room_id": rid, |
| "ts": datetime.now(timezone.utc).isoformat() |
| }) |
|
|
| if target in connections: |
| try: |
| await connections[target].send_text(payload) |
| except Exception: |
| pass |
| await websocket.send_text(payload) |
|
|
| elif t == "typing": |
| target = msg.get("target_id", "") |
| if target in connections: |
| try: |
| await connections[target].send_text(json.dumps({ |
| "type": "typing", "from_id": user_id, |
| "from_name": users[user_id]["name"] |
| })) |
| except Exception: |
| pass |
|
|
| elif t == "rename": |
| new_name = msg.get("name", "").strip() |
| if new_name and len(new_name) <= 32: |
| users[user_id]["name"] = new_name |
| for cid, conn in connections.items(): |
| if cid != user_id: |
| try: |
| await conn.send_text(json.dumps({ |
| "type": "user_renamed", "id": user_id, "name": new_name |
| })) |
| except Exception: |
| pass |
|
|
| except WebSocketDisconnect: |
| pass |
| finally: |
| connections.pop(user_id, None) |
| if user_id in users: |
| users[user_id]["online"] = False |
| for cid, conn in connections.items(): |
| try: |
| await conn.send_text(json.dumps({ |
| "type": "user_offline", "id": user_id |
| })) |
| except Exception: |
| pass |
|
|