algorithm-agent / server.py
fanjingbo111's picture
Deploy algorithm agent app
ae0a268 verified
Raw
History Blame Contribute Delete
4.74 kB
from __future__ import annotations
import json
import os
import time
from pathlib import Path
from typing import Any, Dict
from fastapi import FastAPI, Query
from fastapi.responses import FileResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from agent_core import iter_problem_steps
ROOT_DIR = Path(__file__).resolve().parent
STATIC_DIR = ROOT_DIR / "static"
OUTPUT_DIR = ROOT_DIR / "outputs"
AGNES_MODEL = "agnes-2.0-flash"
AGNES_CHAT_COMPLETIONS_URL = "https://apihub.agnes-ai.com/v1/chat/completions"
app = FastAPI(title="General Algorithm Problem Solving Agent")
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
def load_env_file(path: Path) -> None:
if not path.exists():
return
for raw_line in path.read_text(encoding="utf-8", errors="ignore").splitlines():
line = raw_line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
if key and key not in os.environ:
os.environ[key] = value
load_env_file(ROOT_DIR / ".env")
def sse_event(payload: Dict[str, Any]) -> str:
return f"data: {json.dumps(payload, ensure_ascii=False)}\n\n"
@app.get("/")
def index() -> FileResponse:
return FileResponse(STATIC_DIR / "index.html")
@app.get("/api/status")
def status() -> Dict[str, Any]:
api_key = os.getenv("AGNES_API_KEY", "")
return {
"enabled": bool(api_key),
"provider": "Agnes AI",
"model": AGNES_MODEL,
"base_url_configured": True,
"workflow": [
"Memory",
"Planner",
"Retriever",
"Executor",
"Script Runner",
"Evaluator",
"Loop Controller",
"Reflector",
"Artifact Writer",
],
}
@app.get("/api/run")
def run_agent(
question: str = Query(..., min_length=1),
) -> StreamingResponse:
def stream():
selected_key = os.getenv("AGNES_API_KEY", "")
yield sse_event(
{
"type": "status",
"title": "Agent 启动",
"content": "已接收问题,开始初始化 Session Memory 与任务状态。",
"time": time.strftime("%H:%M:%S"),
}
)
try:
final_state = None
for index, (item, state) in enumerate(
iter_problem_steps(
question,
api_key=selected_key,
base_url=AGNES_CHAT_COMPLETIONS_URL,
model=AGNES_MODEL,
),
start=1,
):
final_state = state
yield sse_event(
{
"type": "step",
"index": index,
"module": item.get("module", ""),
"title": item.get("title", ""),
"content": item.get("content", ""),
"time": time.strftime("%H:%M:%S"),
}
)
time.sleep(0.05)
if final_state is None:
raise RuntimeError("Agent 未产生任何输出。")
result = {
"final_answer": final_state.final_answer,
"pdf_path": final_state.pdf_path,
"tex_path": final_state.tex_path,
}
yield sse_event(
{
"type": "final",
"title": "最终答案",
"content": result["final_answer"],
"pdf_url": "/api/report/pdf",
"tex_url": "/api/report/tex",
"time": time.strftime("%H:%M:%S"),
}
)
yield sse_event({"type": "done"})
except Exception as exc:
yield sse_event(
{
"type": "error",
"title": "运行失败",
"content": str(exc),
"time": time.strftime("%H:%M:%S"),
}
)
yield sse_event({"type": "done"})
return StreamingResponse(stream(), media_type="text/event-stream")
@app.get("/api/report/pdf")
def report_pdf() -> FileResponse:
path = OUTPUT_DIR / "solution_report.pdf"
return FileResponse(path, filename="solution_report.pdf", media_type="application/pdf")
@app.get("/api/report/tex")
def report_tex() -> FileResponse:
path = OUTPUT_DIR / "solution_report.tex"
return FileResponse(path, filename="solution_report.tex", media_type="text/plain")