gChat / app.py
Sebebeb's picture
Upload 3 files
136c1a8 verified
Raw
History Blame Contribute Delete
5.2 kB
import json
import secrets
import hashlib
from datetime import datetime, timezone
from typing import Dict
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, Request
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel
app = FastAPI(title="Private Chat")
templates = Jinja2Templates(directory="templates")
users: Dict[str, dict] = {}
connections: Dict[str, WebSocket] = {}
def generate_token() -> str:
return secrets.token_urlsafe(48)
def room_id(a: str, b: str) -> str:
return hashlib.sha256("".join(sorted([a, b])).encode()).hexdigest()[:16]
class JoinRequest(BaseModel):
display_name: str
@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.post("/api/join")
async def join(body: JoinRequest):
name = body.display_name.strip()
if not name or len(name) > 32:
raise HTTPException(400, "Display name must be 1-32 characters")
uid = generate_token()
users[uid] = {"name": name, "online": True, "joined": datetime.now(timezone.utc).isoformat()}
return {"user_id": uid, "display_name": name}
@app.get("/api/users/{user_id}")
async def list_users(user_id: str):
if user_id not in users:
raise HTTPException(401, "Invalid session")
return {
"users": [
{"id": uid, "name": u["name"]}
for uid, u in users.items() if u["online"] and uid != user_id
]
}
@app.post("/api/rename")
async def rename(data: dict):
uid = data.get("user_id", "")
name = data.get("display_name", "").strip()
if uid not in users:
raise HTTPException(401, "Invalid session")
if not name or len(name) > 32:
raise HTTPException(400, "Display name must be 1-32 characters")
old = users[uid]["name"]
users[uid]["name"] = name
return {"display_name": name, "previous_name": old}
@app.websocket("/ws/{user_id}")
async def ws(websocket: WebSocket, user_id: str):
if user_id not in users:
await websocket.close(code=4001)
return
await websocket.accept()
connections[user_id] = websocket
users[user_id]["online"] = True
for cid, conn in connections.items():
if cid != user_id:
try:
await conn.send_text(json.dumps({
"type": "user_online", "id": user_id, "name": users[user_id]["name"]
}))
except Exception:
pass
try:
while True:
raw = await websocket.receive_text()
msg = json.loads(raw)
t = msg.get("type")
if t == "message":
target = msg.get("target_id", "")
content = msg.get("content", "").strip()
if not content or len(content) > 4000:
continue
if target not in users:
await websocket.send_text(json.dumps({"type": "error", "msg": "User not found"}))
continue
rid = room_id(user_id, target)
payload = json.dumps({
"type": "message",
"from_id": user_id,
"from_name": users[user_id]["name"],
"content": content,
"room_id": rid,
"ts": datetime.now(timezone.utc).isoformat()
})
if target in connections:
try:
await connections[target].send_text(payload)
except Exception:
pass
await websocket.send_text(payload)
elif t == "typing":
target = msg.get("target_id", "")
if target in connections:
try:
await connections[target].send_text(json.dumps({
"type": "typing", "from_id": user_id,
"from_name": users[user_id]["name"]
}))
except Exception:
pass
elif t == "rename":
new_name = msg.get("name", "").strip()
if new_name and len(new_name) <= 32:
users[user_id]["name"] = new_name
for cid, conn in connections.items():
if cid != user_id:
try:
await conn.send_text(json.dumps({
"type": "user_renamed", "id": user_id, "name": new_name
}))
except Exception:
pass
except WebSocketDisconnect:
pass
finally:
connections.pop(user_id, None)
if user_id in users:
users[user_id]["online"] = False
for cid, conn in connections.items():
try:
await conn.send_text(json.dumps({
"type": "user_offline", "id": user_id
}))
except Exception:
pass