import json import os, json, traceback from pathlib import Path from fastapi import FastAPI, Request, BackgroundTasks from fastapi.responses import StreamingResponse, HTMLResponse, JSONResponse from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware import config as cfg from agent_system import orchestrator from sandbox import pip_install app = FastAPI(title="PraisonChat", version="6.0.0") app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) STATIC_DIR = Path(__file__).parent / "static" app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") # ── Always return JSON errors, never HTML ───────────────────── from starlette.exceptions import HTTPException as StarletteHTTPException @app.exception_handler(StarletteHTTPException) async def json_http_handler(request, exc): return JSONResponse(status_code=exc.status_code, content={"ok":False,"detail":str(exc.detail)}) @app.exception_handler(Exception) async def json_generic_handler(request, exc): print(f"[SERVER] {exc}\n{traceback.format_exc()}") return JSONResponse(status_code=500, content={"ok":False,"detail":str(exc)}) @app.get("/", response_class=HTMLResponse) async def root(): return HTMLResponse((STATIC_DIR / "index.html").read_text(encoding="utf-8")) @app.get("/api/health") def health(): return {"ok":True,"version":"6.0.0", "longcat_key":bool(cfg.get_longcat_key()), "telegram":bool(cfg.get_telegram_token())} @app.get("/api/models") def models(): return {"models":[ {"id":"LongCat-Flash-Lite", "name":"LongCat Flash Lite", "context":"320K","speed":"⚡ Fastest","quota":"50M/day"}, {"id":"LongCat-Flash-Chat", "name":"LongCat Flash Chat", "context":"256K","speed":"🚀 Fast", "quota":"500K/day"}, {"id":"LongCat-Flash-Thinking-2601", "name":"LongCat Flash Thinking", "context":"256K","speed":"🧠 Deep", "quota":"500K/day"}, ]} @app.get("/api/memory") def get_memory(): from memory import get_all_for_api return get_all_for_api() @app.delete("/api/memory/{key}") def del_memory(key: str): from memory import MEM_DIR safe = "".join(c if c.isalnum() or c in "-_" else "_" for c in key) path = MEM_DIR / f"{safe}.md" if path.exists(): path.unlink() return {"ok":True} return {"ok":False,"detail":"Not found"} @app.get("/api/skills") def get_skills(): from memory import list_skills, load_skill skills = list_skills() for s in skills: s["code"] = load_skill(s["name"])[:800] return {"skills":skills} @app.delete("/api/skills/{name}") def del_skill(name: str): from memory import delete_skill return {"ok": delete_skill(name)} # ── Chat ─────────────────────────────────────────────────────── @app.post("/api/chat") async def chat(request: Request): try: body = await request.json() except Exception: return JSONResponse(status_code=400, content={"ok":False,"detail":"Invalid JSON body"}) messages = body.get("messages", []) api_key = (body.get("api_key") or "").strip() or cfg.get_longcat_key() model = body.get("model", "LongCat-Flash-Lite") if not api_key: return JSONResponse(status_code=400, content={"ok":False,"detail":"LongCat API key required. Go to Settings (⚙️) and paste your key."}) if not messages: return JSONResponse(status_code=400, content={"ok":False,"detail":"No messages"}) # Save for Telegram if api_key != cfg.get_longcat_key(): cfg.set_longcat_key(api_key) if model != cfg.get_model(): cfg.set_model(model) user_message = messages[-1].get("content","") history = messages[:-1] async def stream(): try: async for chunk in orchestrator.stream_response(user_message, history, api_key, model): yield f"data: {chunk}\n\n" except Exception as e: err = json.dumps({"type":"error","message":str(e)}) yield f"data: {err}\n\n" return StreamingResponse(stream(), media_type="text/event-stream", headers={"X-Accel-Buffering":"no","Cache-Control":"no-cache","Connection":"keep-alive"}) # ── Package installer ────────────────────────────────────────── # ── Cross-platform: Web SSE push from Telegram ──────────────── import asyncio as _asyncio _sse_queues: dict = {} # session_id -> asyncio.Queue @app.get("/api/sse/{session_id}") async def sse_endpoint(session_id: str, request: Request): """SSE stream — pushes messages from Telegram to Web UI.""" queue = _asyncio.Queue() _sse_queues[session_id] = queue from agent_system import orchestrator async def tg_notify(role, content): await queue.put(json.dumps({"type":"tg_message","role":role,"content":content})) orchestrator.register_tg_notify(session_id, tg_notify) async def event_stream(): try: yield "data: " + json.dumps({"type":"connected","session":session_id}) + "\n\n" while True: if await request.is_disconnected(): break try: msg = await _asyncio.wait_for(queue.get(), timeout=25) yield f"data: {msg}\n\n" except _asyncio.TimeoutError: yield "data: " + json.dumps({"type":"ping"}) + "\n\n" finally: _sse_queues.pop(session_id, None) orchestrator.unregister_tg_notify(session_id) return StreamingResponse(event_stream(), media_type="text/event-stream", headers={"X-Accel-Buffering":"no","Cache-Control":"no-cache"}) @app.post("/api/send-to-telegram") async def send_to_telegram(request: Request): """Send a web UI message to linked Telegram chat.""" try: body = await request.json() chat_id = body.get("chat_id") message = body.get("message","") api_key = (body.get("api_key") or "").strip() or cfg.get_longcat_key() model = body.get("model","LongCat-Flash-Lite") if not chat_id: return JSONResponse({"ok":False,"detail":"No chat_id"}) from telegram_bot import handle_update fake_update = {"message":{"chat":{"id":chat_id},"from":{"first_name":"WebUI"},"text":message}} import asyncio as _aio _aio.create_task(handle_update(fake_update, api_key, model)) return {"ok":True} except Exception as e: return JSONResponse(status_code=500, content={"ok":False,"detail":str(e)}) @app.post("/api/install-package") async def install_package(request: Request): try: body = await request.json() packages = body.get("packages", []) if not packages: return JSONResponse(status_code=400, content={"ok":False,"detail":"No packages"}) import asyncio loop = asyncio.get_event_loop() ok, msg = await loop.run_in_executor(None, pip_install, packages) return {"ok":ok,"message":msg} except Exception as e: return JSONResponse(status_code=500, content={"ok":False,"detail":str(e)}) # ── Telegram ─────────────────────────────────────────────────── @app.post("/telegram/webhook") async def telegram_webhook(request: Request, background_tasks: BackgroundTasks): from telegram_bot import handle_update try: update = await request.json() except Exception: return JSONResponse({"ok":True}) print(f"[WEBHOOK] {str(update)[:120]}") api_key = cfg.get_longcat_key() model = cfg.get_model() if not api_key: msg = update.get("message",{}) chat_id = msg.get("chat",{}).get("id") if chat_id: from telegram_bot import send_message await send_message(chat_id, "⚠️ No LongCat API key saved!\n\n" "Open the web app → Settings (⚙️) → paste your LongCat key → Save.\n" "Or set LONGCAT_API_KEY as a HuggingFace Space Secret.") return JSONResponse({"ok":True}) background_tasks.add_task(handle_update, update, api_key, model) return JSONResponse({"ok":True}) @app.post("/api/telegram/setup") async def telegram_setup(request: Request): try: body = await request.json() except Exception: return JSONResponse(status_code=400, content={"ok":False,"detail":"Invalid JSON"}) token = body.get("token","").strip() base_url = body.get("base_url","").strip() api_key = body.get("api_key","").strip() model = body.get("model","").strip() if token: cfg.set_telegram_token(token) if api_key: cfg.set_longcat_key(api_key) if model: cfg.set_model(model) if not cfg.get_telegram_token(): return JSONResponse(status_code=400, content={"ok":False,"detail":"No Telegram bot token provided"}) # Check DNS first from telegram_bot import check_telegram_reachable, get_bot_info, set_webhook import asyncio loop = asyncio.get_event_loop() reachable, dns_msg = await loop.run_in_executor(None, check_telegram_reachable) if not reachable: return JSONResponse(status_code=503, content={ "ok": False, "detail": f"Cannot reach Telegram API from this server. {dns_msg}\n\n" f"Solution: Set TELEGRAM_BOT_TOKEN as a HuggingFace Space Secret " f"(Settings → Variables and Secrets). The bot will still receive " f"messages via webhook even if this setup fails." }) try: bot = await get_bot_info() if not bot.get("ok"): return JSONResponse(status_code=400, content={"ok":False,"detail":f"Invalid bot token: {bot.get('description','check your token')}"}) webhook_result = {} if base_url: webhook_result = await set_webhook(base_url) return JSONResponse({"ok":True,"bot":bot.get("result",{}),"webhook":webhook_result}) except Exception as e: return JSONResponse(status_code=500, content={"ok":False,"detail":str(e)}) @app.get("/api/telegram/status") async def telegram_status(): try: token = cfg.get_telegram_token() # Also check env var env_token = os.environ.get("TELEGRAM_BOT_TOKEN","") if env_token and not token: cfg.set_telegram_token(env_token) token = env_token if not token: return {"connected":False,"message":"No bot token set"} from telegram_bot import check_telegram_reachable, get_bot_info, get_webhook_info import asyncio loop = asyncio.get_event_loop() reachable, _ = await loop.run_in_executor(None, check_telegram_reachable) if not reachable: return {"connected":False,"message":"Token set but api.telegram.org unreachable from server. Use HF Space Secret TELEGRAM_BOT_TOKEN.","token_set":True} bot = await get_bot_info() hook = await get_webhook_info() return {"connected":bot.get("ok",False),"bot":bot.get("result",{}), "webhook":hook.get("result",{}),"longcat_key_set":bool(cfg.get_longcat_key())} except Exception as e: return {"connected":False,"message":str(e)} @app.delete("/api/telegram/disconnect") async def telegram_disconnect(): try: from telegram_bot import delete_webhook, check_telegram_reachable import asyncio loop = asyncio.get_event_loop() reachable, _ = await loop.run_in_executor(None, check_telegram_reachable) result = {} if reachable: result = await delete_webhook() cfg.set("telegram_token","") return {"ok":True,"result":result} except Exception as e: cfg.set("telegram_token","") return {"ok":True}