| import json |
| import os, json, traceback |
| from pathlib import Path |
| from fastapi import FastAPI, Request, BackgroundTasks |
| from fastapi.responses import StreamingResponse, HTMLResponse, JSONResponse |
| from fastapi.staticfiles import StaticFiles |
| from fastapi.middleware.cors import CORSMiddleware |
| import config as cfg |
| from agent_system import orchestrator |
| from sandbox import pip_install |
|
|
| app = FastAPI(title="PraisonChat", version="6.0.0") |
| app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) |
|
|
| STATIC_DIR = Path(__file__).parent / "static" |
| app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") |
|
|
|
|
| |
| from starlette.exceptions import HTTPException as StarletteHTTPException |
| @app.exception_handler(StarletteHTTPException) |
| async def json_http_handler(request, exc): |
| return JSONResponse(status_code=exc.status_code, content={"ok":False,"detail":str(exc.detail)}) |
|
|
| @app.exception_handler(Exception) |
| async def json_generic_handler(request, exc): |
| print(f"[SERVER] {exc}\n{traceback.format_exc()}") |
| return JSONResponse(status_code=500, content={"ok":False,"detail":str(exc)}) |
|
|
|
|
| @app.get("/", response_class=HTMLResponse) |
| async def root(): |
| return HTMLResponse((STATIC_DIR / "index.html").read_text(encoding="utf-8")) |
|
|
|
|
| @app.get("/api/health") |
| def health(): |
| return {"ok":True,"version":"6.0.0", |
| "longcat_key":bool(cfg.get_longcat_key()), |
| "telegram":bool(cfg.get_telegram_token())} |
|
|
|
|
| @app.get("/api/models") |
| def models(): |
| return {"models":[ |
| {"id":"LongCat-Flash-Lite", "name":"LongCat Flash Lite", "context":"320K","speed":"β‘ Fastest","quota":"50M/day"}, |
| {"id":"LongCat-Flash-Chat", "name":"LongCat Flash Chat", "context":"256K","speed":"π Fast", "quota":"500K/day"}, |
| {"id":"LongCat-Flash-Thinking-2601", "name":"LongCat Flash Thinking", "context":"256K","speed":"π§ Deep", "quota":"500K/day"}, |
| ]} |
|
|
|
|
| @app.get("/api/memory") |
| def get_memory(): |
| from memory import get_all_for_api |
| return get_all_for_api() |
|
|
| @app.delete("/api/memory/{key}") |
| def del_memory(key: str): |
| from memory import MEM_DIR |
| safe = "".join(c if c.isalnum() or c in "-_" else "_" for c in key) |
| path = MEM_DIR / f"{safe}.md" |
| if path.exists(): |
| path.unlink() |
| return {"ok":True} |
| return {"ok":False,"detail":"Not found"} |
|
|
| @app.get("/api/skills") |
| def get_skills(): |
| from memory import list_skills, load_skill |
| skills = list_skills() |
| for s in skills: |
| s["code"] = load_skill(s["name"])[:800] |
| return {"skills":skills} |
|
|
| @app.delete("/api/skills/{name}") |
| def del_skill(name: str): |
| from memory import delete_skill |
| return {"ok": delete_skill(name)} |
|
|
|
|
| |
| @app.post("/api/chat") |
| async def chat(request: Request): |
| try: |
| body = await request.json() |
| except Exception: |
| return JSONResponse(status_code=400, content={"ok":False,"detail":"Invalid JSON body"}) |
|
|
| messages = body.get("messages", []) |
| api_key = (body.get("api_key") or "").strip() or cfg.get_longcat_key() |
| model = body.get("model", "LongCat-Flash-Lite") |
|
|
| if not api_key: |
| return JSONResponse(status_code=400, |
| content={"ok":False,"detail":"LongCat API key required. Go to Settings (βοΈ) and paste your key."}) |
| if not messages: |
| return JSONResponse(status_code=400, content={"ok":False,"detail":"No messages"}) |
|
|
| |
| if api_key != cfg.get_longcat_key(): cfg.set_longcat_key(api_key) |
| if model != cfg.get_model(): cfg.set_model(model) |
|
|
| user_message = messages[-1].get("content","") |
| history = messages[:-1] |
|
|
| async def stream(): |
| try: |
| async for chunk in orchestrator.stream_response(user_message, history, api_key, model): |
| yield f"data: {chunk}\n\n" |
| except Exception as e: |
| err = json.dumps({"type":"error","message":str(e)}) |
| yield f"data: {err}\n\n" |
|
|
| return StreamingResponse(stream(), media_type="text/event-stream", |
| headers={"X-Accel-Buffering":"no","Cache-Control":"no-cache","Connection":"keep-alive"}) |
|
|
|
|
| |
|
|
| |
| import asyncio as _asyncio |
| _sse_queues: dict = {} |
|
|
| @app.get("/api/sse/{session_id}") |
| async def sse_endpoint(session_id: str, request: Request): |
| """SSE stream β pushes messages from Telegram to Web UI.""" |
| queue = _asyncio.Queue() |
| _sse_queues[session_id] = queue |
|
|
| from agent_system import orchestrator |
| async def tg_notify(role, content): |
| await queue.put(json.dumps({"type":"tg_message","role":role,"content":content})) |
| orchestrator.register_tg_notify(session_id, tg_notify) |
|
|
| async def event_stream(): |
| try: |
| yield "data: " + json.dumps({"type":"connected","session":session_id}) + "\n\n" |
| while True: |
| if await request.is_disconnected(): |
| break |
| try: |
| msg = await _asyncio.wait_for(queue.get(), timeout=25) |
| yield f"data: {msg}\n\n" |
| except _asyncio.TimeoutError: |
| yield "data: " + json.dumps({"type":"ping"}) + "\n\n" |
| finally: |
| _sse_queues.pop(session_id, None) |
| orchestrator.unregister_tg_notify(session_id) |
|
|
| return StreamingResponse(event_stream(), media_type="text/event-stream", |
| headers={"X-Accel-Buffering":"no","Cache-Control":"no-cache"}) |
|
|
|
|
| @app.post("/api/send-to-telegram") |
| async def send_to_telegram(request: Request): |
| """Send a web UI message to linked Telegram chat.""" |
| try: |
| body = await request.json() |
| chat_id = body.get("chat_id") |
| message = body.get("message","") |
| api_key = (body.get("api_key") or "").strip() or cfg.get_longcat_key() |
| model = body.get("model","LongCat-Flash-Lite") |
| if not chat_id: |
| return JSONResponse({"ok":False,"detail":"No chat_id"}) |
| from telegram_bot import handle_update |
| fake_update = {"message":{"chat":{"id":chat_id},"from":{"first_name":"WebUI"},"text":message}} |
| import asyncio as _aio |
| _aio.create_task(handle_update(fake_update, api_key, model)) |
| return {"ok":True} |
| except Exception as e: |
| return JSONResponse(status_code=500, content={"ok":False,"detail":str(e)}) |
|
|
| @app.post("/api/install-package") |
| async def install_package(request: Request): |
| try: |
| body = await request.json() |
| packages = body.get("packages", []) |
| if not packages: |
| return JSONResponse(status_code=400, content={"ok":False,"detail":"No packages"}) |
| import asyncio |
| loop = asyncio.get_event_loop() |
| ok, msg = await loop.run_in_executor(None, pip_install, packages) |
| return {"ok":ok,"message":msg} |
| except Exception as e: |
| return JSONResponse(status_code=500, content={"ok":False,"detail":str(e)}) |
|
|
|
|
| |
| @app.post("/telegram/webhook") |
| async def telegram_webhook(request: Request, background_tasks: BackgroundTasks): |
| from telegram_bot import handle_update |
| try: |
| update = await request.json() |
| except Exception: |
| return JSONResponse({"ok":True}) |
|
|
| print(f"[WEBHOOK] {str(update)[:120]}") |
| api_key = cfg.get_longcat_key() |
| model = cfg.get_model() |
|
|
| if not api_key: |
| msg = update.get("message",{}) |
| chat_id = msg.get("chat",{}).get("id") |
| if chat_id: |
| from telegram_bot import send_message |
| await send_message(chat_id, |
| "β οΈ No LongCat API key saved!\n\n" |
| "Open the web app β Settings (βοΈ) β paste your LongCat key β Save.\n" |
| "Or set LONGCAT_API_KEY as a HuggingFace Space Secret.") |
| return JSONResponse({"ok":True}) |
|
|
| background_tasks.add_task(handle_update, update, api_key, model) |
| return JSONResponse({"ok":True}) |
|
|
|
|
| @app.post("/api/telegram/setup") |
| async def telegram_setup(request: Request): |
| try: |
| body = await request.json() |
| except Exception: |
| return JSONResponse(status_code=400, content={"ok":False,"detail":"Invalid JSON"}) |
|
|
| token = body.get("token","").strip() |
| base_url = body.get("base_url","").strip() |
| api_key = body.get("api_key","").strip() |
| model = body.get("model","").strip() |
|
|
| if token: cfg.set_telegram_token(token) |
| if api_key: cfg.set_longcat_key(api_key) |
| if model: cfg.set_model(model) |
|
|
| if not cfg.get_telegram_token(): |
| return JSONResponse(status_code=400, |
| content={"ok":False,"detail":"No Telegram bot token provided"}) |
|
|
| |
| from telegram_bot import check_telegram_reachable, get_bot_info, set_webhook |
| import asyncio |
| loop = asyncio.get_event_loop() |
| reachable, dns_msg = await loop.run_in_executor(None, check_telegram_reachable) |
| if not reachable: |
| return JSONResponse(status_code=503, content={ |
| "ok": False, |
| "detail": f"Cannot reach Telegram API from this server. {dns_msg}\n\n" |
| f"Solution: Set TELEGRAM_BOT_TOKEN as a HuggingFace Space Secret " |
| f"(Settings β Variables and Secrets). The bot will still receive " |
| f"messages via webhook even if this setup fails." |
| }) |
|
|
| try: |
| bot = await get_bot_info() |
| if not bot.get("ok"): |
| return JSONResponse(status_code=400, |
| content={"ok":False,"detail":f"Invalid bot token: {bot.get('description','check your token')}"}) |
|
|
| webhook_result = {} |
| if base_url: |
| webhook_result = await set_webhook(base_url) |
|
|
| return JSONResponse({"ok":True,"bot":bot.get("result",{}),"webhook":webhook_result}) |
| except Exception as e: |
| return JSONResponse(status_code=500, content={"ok":False,"detail":str(e)}) |
|
|
|
|
| @app.get("/api/telegram/status") |
| async def telegram_status(): |
| try: |
| token = cfg.get_telegram_token() |
| |
| env_token = os.environ.get("TELEGRAM_BOT_TOKEN","") |
| if env_token and not token: |
| cfg.set_telegram_token(env_token) |
| token = env_token |
| if not token: |
| return {"connected":False,"message":"No bot token set"} |
| from telegram_bot import check_telegram_reachable, get_bot_info, get_webhook_info |
| import asyncio |
| loop = asyncio.get_event_loop() |
| reachable, _ = await loop.run_in_executor(None, check_telegram_reachable) |
| if not reachable: |
| return {"connected":False,"message":"Token set but api.telegram.org unreachable from server. Use HF Space Secret TELEGRAM_BOT_TOKEN.","token_set":True} |
| bot = await get_bot_info() |
| hook = await get_webhook_info() |
| return {"connected":bot.get("ok",False),"bot":bot.get("result",{}), |
| "webhook":hook.get("result",{}),"longcat_key_set":bool(cfg.get_longcat_key())} |
| except Exception as e: |
| return {"connected":False,"message":str(e)} |
|
|
|
|
| @app.delete("/api/telegram/disconnect") |
| async def telegram_disconnect(): |
| try: |
| from telegram_bot import delete_webhook, check_telegram_reachable |
| import asyncio |
| loop = asyncio.get_event_loop() |
| reachable, _ = await loop.run_in_executor(None, check_telegram_reachable) |
| result = {} |
| if reachable: |
| result = await delete_webhook() |
| cfg.set("telegram_token","") |
| return {"ok":True,"result":result} |
| except Exception as e: |
| cfg.set("telegram_token","") |
| return {"ok":True} |