Spaces:
Paused
Paused
| import asyncio | |
| import json | |
| import os | |
| import uuid | |
| from datetime import datetime | |
| from json import dumps | |
| from fastapi import Body, FastAPI, HTTPException, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import (FileResponse, HTMLResponse, JSONResponse, StreamingResponse) | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.templating import Jinja2Templates | |
| from pydantic import BaseModel | |
| from loguru import logger | |
| import uvicorn | |
| import aiohttp | |
| app = FastAPI() | |
| OPENMANUS_ENDPOINT_URL = os.getenv("OPENMANUS_ENDPOINT_URL") | |
| if not OPENMANUS_ENDPOINT_URL: | |
| raise EnvironmentError("OPENMANUS_ENDPOINT_URL environment variable must be set") | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| templates = Jinja2Templates(directory="templates") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| class Task(BaseModel): | |
| id: str | |
| prompt: str | |
| created_at: datetime | |
| status: str | |
| steps: list = [] | |
| def model_dump(self, *args, **kwargs): | |
| data = super().model_dump(*args, **kwargs) | |
| data["created_at"] = self.created_at.isoformat() | |
| return data | |
| class TaskManager: | |
| def __init__(self): | |
| self.tasks = {} | |
| self.queues = {} | |
| def create_task(self, prompt: str) -> Task: | |
| task_id = str(uuid.uuid4()) | |
| task = Task( | |
| id=task_id, prompt=prompt, created_at=datetime.now(), status="pending" | |
| ) | |
| self.tasks[task_id] = task | |
| self.queues[task_id] = asyncio.Queue() | |
| return task | |
| async def update_task_step( | |
| self, task_id: str, step: int, result: str, step_type: str = "step" | |
| ): | |
| if task_id in self.tasks: | |
| task = self.tasks[task_id] | |
| task.steps.append({"step": step, "result": result, "type": step_type}) | |
| await self.queues[task_id].put( | |
| {"type": step_type, "step": step, "result": result} | |
| ) | |
| await self.queues[task_id].put( | |
| {"type": "status", "status": task.status, "steps": task.steps} | |
| ) | |
| async def complete_task(self, task_id: str): | |
| if task_id in self.tasks: | |
| task = self.tasks[task_id] | |
| task.status = "completed" | |
| await self.queues[task_id].put( | |
| {"type": "status", "status": task.status, "steps": task.steps} | |
| ) | |
| await self.queues[task_id].put({"type": "complete"}) | |
| async def fail_task(self, task_id: str, error: str): | |
| if task_id in self.tasks: | |
| self.tasks[task_id].status = f"failed: {error}" | |
| await self.queues[task_id].put({"type": "error", "message": error}) | |
| task_manager = TaskManager() | |
| def get_available_themes(): | |
| """扫描themes目录获取所有可用主题""" | |
| themes_dir = "static/themes" | |
| if not os.path.exists(themes_dir): | |
| return [{"id": "openmanus", "name": "Manus", "description": "默认主题"}] | |
| themes = [] | |
| for item in os.listdir(themes_dir): | |
| theme_path = os.path.join(themes_dir, item) | |
| if os.path.isdir(theme_path): | |
| # 验证主题文件夹是否包含必要的文件 | |
| templates_dir = os.path.join(theme_path, "templates") | |
| static_dir = os.path.join(theme_path, "static") | |
| config_file = os.path.join(theme_path, "theme.json") | |
| if os.path.exists(templates_dir) and os.path.exists(static_dir): | |
| if os.path.exists(os.path.join(templates_dir, "chat.html")): | |
| theme_info = {"id": item, "name": item, "description": ""} | |
| # 如果有配置文件,读取主题名称和描述 | |
| if os.path.exists(config_file): | |
| try: | |
| with open(config_file, "r", encoding="utf-8") as f: | |
| config = json.load(f) | |
| theme_info["name"] = config.get("name", item) | |
| theme_info["description"] = config.get( | |
| "description", "" | |
| ) | |
| except Exception as e: | |
| print(f"读取主题配置文件出错: {str(e)}") | |
| themes.append(theme_info) | |
| # 确保Normal主题始终存在 | |
| normal_exists = any(theme["id"] == "openmanus" for theme in themes) | |
| if not normal_exists: | |
| themes.append({"id": "openmanus", "name": "Manus", "description": "默认主题"}) | |
| return themes | |
| async def index(request: Request): | |
| # 获取可用主题列表 | |
| themes = get_available_themes() | |
| # 对主题进行排序:Normal在前,cyberpunk在后,其他主题按原顺序 | |
| sorted_themes = [] | |
| normal_theme = None | |
| cyberpunk_theme = None | |
| other_themes = [] | |
| for theme in themes: | |
| if theme["id"] == "openmanus": | |
| normal_theme = theme | |
| elif theme["id"] == "cyberpunk": | |
| cyberpunk_theme = theme | |
| else: | |
| other_themes.append(theme) | |
| # 按照指定顺序组合主题 | |
| if normal_theme: | |
| sorted_themes.append(normal_theme) | |
| if cyberpunk_theme: | |
| sorted_themes.append(cyberpunk_theme) | |
| sorted_themes.extend(other_themes) | |
| return templates.TemplateResponse( | |
| "index.html", {"request": request, "themes": sorted_themes} | |
| ) | |
| async def chat(request: Request): | |
| theme = request.query_params.get("theme", "openmanus") | |
| # 尝试从主题文件夹加载chat.html | |
| theme_chat_path = f"static/themes/{theme}/templates/chat.html" | |
| if os.path.exists(theme_chat_path): | |
| with open(theme_chat_path, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| # 读取主题配置文件 | |
| theme_config_path = f"static/themes/{theme}/theme.json" | |
| theme_name = theme | |
| if os.path.exists(theme_config_path): | |
| try: | |
| with open(theme_config_path, "r", encoding="utf-8") as f: | |
| config = json.load(f) | |
| theme_name = config.get("name", theme) | |
| except Exception: | |
| pass | |
| # 将主题名称添加到HTML标题中 | |
| content = content.replace( | |
| "<title>Manus</title>", f"<title>Manus - {theme_name}</title>" | |
| ) | |
| return HTMLResponse(content=content) | |
| else: | |
| # 默认使用templates中的chat.html | |
| return templates.TemplateResponse("chat.html", {"request": request}) | |
| async def download_file(file_path: str): | |
| if not os.path.exists(file_path): | |
| raise HTTPException(status_code=404, detail="File not found") | |
| return FileResponse(file_path, filename=os.path.basename(file_path)) | |
| async def create_task(prompt: str = Body(..., embed=True)): | |
| task = task_manager.create_task(prompt) | |
| asyncio.create_task(run_task(task.id, prompt)) | |
| return {"task_id": task.id} | |
| async def run_task(task_id: str, prompt: str): | |
| try: | |
| task_manager.tasks[task_id].status = "running" | |
| async def on_think(thought): | |
| await task_manager.update_task_step(task_id, 0, thought, "think") | |
| async def on_tool_execute(tool, input): | |
| await task_manager.update_task_step( | |
| task_id, 0, f"Executing tool: {tool}\nInput: {input}", "tool" | |
| ) | |
| async def on_action(action): | |
| await task_manager.update_task_step( | |
| task_id, 0, f"Executing action: {action}", "act" | |
| ) | |
| async def on_run(step, result): | |
| await task_manager.update_task_step(task_id, step, result, "run") | |
| class SSELogHandler: | |
| def __init__(self, task_id): | |
| self.task_id = task_id | |
| async def __call__(self, message): | |
| import re | |
| # Extract - Subsequent Content | |
| cleaned_message = re.sub(r"^.*? - ", "", message) | |
| cleaned_message = re.sub(r"^.*? - ", "", cleaned_message) | |
| event_type = "log" | |
| if "✨ Manus's thoughts:" in cleaned_message: | |
| event_type = "think" | |
| elif "🛠️ Manus selected" in cleaned_message: | |
| event_type = "tool" | |
| elif "🎯 Tool" in cleaned_message: | |
| event_type = "act" | |
| elif "📝 Oops!" in cleaned_message: | |
| event_type = "error" | |
| elif "🏁 Special tool" in cleaned_message: | |
| event_type = "complete" | |
| elif "🎉 Manus result:" in cleaned_message: | |
| event_type = "result" | |
| cleaned_message = cleaned_message.replace("🎉 Manus result:", "") | |
| await task_manager.update_task_step( | |
| self.task_id, 1, cleaned_message, event_type | |
| ) | |
| return | |
| await task_manager.update_task_step( | |
| self.task_id, 0, cleaned_message, event_type | |
| ) | |
| sse_handler = SSELogHandler(task_id) | |
| logger.add(sse_handler) | |
| import re | |
| def has_log_prefix(message): | |
| # 检查字符串是否包含两个 "|" 且以 " - " 分割前缀和内容 | |
| return re.match(r"^.*?\|.*?\|.*? - ", message) is not None | |
| async def call_manus(url: str, prompt: str): | |
| generate_kwargs = { | |
| "prompt": prompt, | |
| } | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post( | |
| url=url, | |
| json=generate_kwargs, | |
| timeout=aiohttp.ClientTimeout(total=3600) | |
| ) as response: | |
| buffer = "" | |
| async for line in response.content: | |
| decode_line = line.decode('utf-8') | |
| if has_log_prefix(decode_line) and len(buffer)>0: | |
| logger.info(buffer) | |
| buffer = "" | |
| else: | |
| buffer += decode_line | |
| if buffer: | |
| logger.info(buffer) | |
| await call_manus(OPENMANUS_ENDPOINT_URL, prompt) | |
| await task_manager.update_task_step(task_id, 1, "", "result") | |
| await task_manager.complete_task(task_id) | |
| except Exception as e: | |
| await task_manager.fail_task(task_id, str(e)) | |
| async def task_events(task_id: str): | |
| async def event_generator(): | |
| if task_id not in task_manager.queues: | |
| yield f"event: error\ndata: {dumps({'message': 'Task not found'})}\n\n" | |
| return | |
| queue = task_manager.queues[task_id] | |
| task = task_manager.tasks.get(task_id) | |
| if task: | |
| yield f"event: status\ndata: {dumps({'type': 'status', 'status': task.status, 'steps': task.steps})}\n\n" | |
| while True: | |
| try: | |
| event = await queue.get() | |
| formatted_event = dumps(event) | |
| yield ": heartbeat\n\n" | |
| if event["type"] == "complete": | |
| yield f"event: complete\ndata: {formatted_event}\n\n" | |
| break | |
| elif event["type"] == "error": | |
| yield f"event: error\ndata: {formatted_event}\n\n" | |
| break | |
| elif event["type"] == "step": | |
| task = task_manager.tasks.get(task_id) | |
| if task: | |
| yield f"event: status\ndata: {dumps({'type': 'status', 'status': task.status, 'steps': task.steps})}\n\n" | |
| yield f"event: {event['type']}\ndata: {formatted_event}\n\n" | |
| elif event["type"] in ["think", "tool", "act", "run"]: | |
| yield f"event: {event['type']}\ndata: {formatted_event}\n\n" | |
| else: | |
| yield f"event: {event['type']}\ndata: {formatted_event}\n\n" | |
| except asyncio.CancelledError: | |
| print(f"Client disconnected for task {task_id}") | |
| break | |
| except Exception as e: | |
| print(f"Error in event stream: {str(e)}") | |
| yield f"event: error\ndata: {dumps({'message': str(e)})}\n\n" | |
| break | |
| return StreamingResponse( | |
| event_generator(), | |
| media_type="text/event-stream", | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "Connection": "keep-alive", | |
| "X-Accel-Buffering": "no", | |
| }, | |
| ) | |
| async def get_tasks(): | |
| sorted_tasks = sorted( | |
| task_manager.tasks.values(), key=lambda task: task.created_at, reverse=True | |
| ) | |
| return JSONResponse( | |
| content=[task.model_dump() for task in sorted_tasks], | |
| headers={"Content-Type": "application/json"}, | |
| ) | |
| async def get_task(task_id: str): | |
| if task_id not in task_manager.tasks: | |
| raise HTTPException(status_code=404, detail="Task not found") | |
| return task_manager.tasks[task_id] | |
| async def generic_exception_handler(request: Request, exc: Exception): | |
| return JSONResponse( | |
| status_code=500, content={"message": f"Server error: {str(exc)}"} | |
| ) | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |