| |
|
|
| import re |
| import bleach |
| import uuid |
| import json |
|
|
| from fastapi import APIRouter, Request, Form, UploadFile, File |
| from fastapi.responses import RedirectResponse, HTMLResponse, StreamingResponse |
| from fastapi.templating import Jinja2Templates |
| from datetime import datetime |
| from starlette.status import HTTP_303_SEE_OTHER |
| from typing import List |
| from tools.storage import Storage |
|
|
| router = APIRouter() |
| templates = Jinja2Templates(directory="notebook/templates") |
| storage = Storage() |
|
|
| allowed_tags = ['b', 'i', 's', 'u', 'a', 'ol', 'ul', 'li', 'dl', 'dt', 'dd', 'table', 'caption', 'tr', 'th', 'td', 'code', 'pre', 'blockquote', 'br', 'hr'] |
| allowed_attributes = { |
| 'a': ['href', 'title'] |
| } |
|
|
| |
| def format_timestamp(value): |
| try: |
| dt = datetime.fromtimestamp(float(value)) |
| return dt.strftime("%Y-%m-%d %H:%M:%S") |
| except Exception: |
| return str(value) |
|
|
| templates.env.filters['format_timestamp'] = format_timestamp |
|
|
| |
| def sanitize_html(text: str) -> str: |
| |
| cleaned = bleach.clean(text, tags=allowed_tags, attributes=allowed_attributes, strip=True) |
|
|
| |
| cleaned = re.sub(r'(<br\s*/?>\s*){3,}', '<br><br>', cleaned, flags=re.IGNORECASE) |
|
|
| return cleaned |
|
|
| |
| def extract_mentions_and_hashtags(text: str): |
| |
| mentions = re.findall(r'@([\w:.-]+)', text) |
|
|
| |
| hashtags = re.findall(r'#(\w+)', text) |
|
|
| return mentions, hashtags |
|
|
| @router.get("/chat") |
| def chat_page(request: Request): |
| did = request.session.get("did") |
| username = request.session.get("username") |
| if not did: |
| return RedirectResponse("/login", status_code=303) |
|
|
| notes = storage.fetchall( |
| "SELECT text, timestamp, source FROM notes WHERE hidden=0 AND user_did=? ORDER BY timestamp DESC LIMIT 20", |
| (did,) |
| ) |
| return templates.TemplateResponse("chat.html", { |
| "request": request, |
| "notes": notes, |
| "username": username |
| }) |
|
|
| @router.post("/chat") |
| def submit_note(request: Request, message: str = Form(...)): |
| did = request.session.get("did", "anon") |
| if message.strip(): |
| storage.write_note( |
| content=message.strip(), |
| user_did=did, |
| source="user" |
| ) |
| return RedirectResponse(url="/chat", status_code=303) |
|
|
| @router.get("/messages") |
| def show_messages(request: Request, only_personal: bool = False): |
| did = request.session.get("did") |
| username = request.session.get("username") |
| if not did: |
| return RedirectResponse("/login", status_code=303) |
|
|
| messages = storage.get_notes( |
| limit=50, |
| user_did=did, |
| only_personal=only_personal |
| ) |
| return templates.TemplateResponse("messages.html", { |
| "request": request, |
| "messages": messages, |
| "only_personal": only_personal, |
| "username": username |
| }) |
|
|
| @router.post("/messages") |
| async def post_message( |
| request: Request, |
| text: str = Form(...), |
| code: str = Form(None), |
| hidden: str = Form(default="false"), |
| binary_files: List[UploadFile] = File(default=[]) |
| ): |
| did = request.session.get("did", "anon") |
| is_hidden = 1 if hidden.lower() == "true" else 0 |
|
|
| if storage.is_banned(did): |
| return HTMLResponse(content="Вы забанены и не можете отправлять сообщения.", status_code=403) |
|
|
| if text.strip() or code or binary_files: |
| |
| safe_text = sanitize_html(text.strip()) if text else "" |
|
|
| |
| mentions, hashtags = extract_mentions_and_hashtags(safe_text) |
|
|
| |
| agent_did = storage.get_config_value("agent_id") |
|
|
| |
| message_id = storage.write_note_returning_id( |
| content=safe_text, |
| user_did=did, |
| agent_did=agent_did, |
| source="user", |
| hidden=is_hidden, |
| code=code.strip() if code else None, |
| mentions=json.dumps(mentions, ensure_ascii=False), |
| hashtags=json.dumps(hashtags, ensure_ascii=False) |
| ) |
|
|
| |
| for upload in binary_files: |
| data = await upload.read() |
| if data: |
| storage.save_attachment( |
| message_id=message_id, |
| filename=upload.filename, |
| mime_type=upload.content_type, |
| content=data |
| ) |
|
|
| return RedirectResponse(url="/messages", status_code=303) |
|
|
| @router.get("/download/{file_id}") |
| def download_file(file_id: int): |
| file = storage.get_attachment_by_id(file_id) |
| if not file: |
| raise HTTPException(status_code=404, detail="Файл не найден") |
|
|
| return StreamingResponse( |
| iter([file["binary"]]), |
| media_type=file["mime_type"], |
| headers={ |
| "Content-Disposition": f'attachment; filename="{file["filename"]}"' |
| } |
| ) |
|
|
| @router.get("/login") |
| def login_page(request: Request): |
| return templates.TemplateResponse("login.html", {"request": request}) |
|
|
| @router.post("/login") |
| def login_user(request: Request, mail: str = Form(...), password: str = Form(...)): |
| if storage.authenticate_user(mail, password): |
| user_info = storage.get_user_info(mail) |
| request.session["username"] = user_info["username"] |
| request.session["did"] = user_info["did"] |
| return RedirectResponse("/messages", status_code=HTTP_303_SEE_OTHER) |
| return templates.TemplateResponse("login.html", { |
| "request": request, |
| "error": "Неверный email или пароль" |
| }) |
|
|
| @router.get("/register") |
| def register_page(request: Request): |
| return templates.TemplateResponse("register.html", {"request": request}) |
|
|
| @router.post("/register") |
| def register_user( |
| request: Request, |
| username: str = Form(...), |
| mail: str = Form(...), |
| password: str = Form(...) |
| ): |
| if storage.register_user(username, mail, password): |
| user_info = storage.get_user_info(mail) |
| request.session["username"] = user_info["username"] |
| request.session["did"] = user_info["did"] |
| return RedirectResponse("/messages", status_code=HTTP_303_SEE_OTHER) |
| return templates.TemplateResponse("register.html", { |
| "request": request, |
| "error": "Пользователь с таким email уже существует" |
| }) |
|
|
| @router.get("/logout") |
| def logout(request: Request): |
| request.session.clear() |
| return RedirectResponse("/login", status_code=HTTP_303_SEE_OTHER) |
|
|