algorithm-agent / agent_core.py
fanjingbo111's picture
Deploy algorithm agent app
ae0a268 verified
Raw
History Blame Contribute Delete
18.8 kB
from __future__ import annotations
import json
import re
import urllib.error
import urllib.request
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from reporting import write_solution_report
from tools import (
AlgorithmPattern,
ScriptResult,
extract_python_code,
format_patterns,
offline_algorithm_answer,
retrieve_algorithm_patterns,
safe_run_python,
)
SYSTEM_PROMPT = """你是一个通用算法问题优化与求解智能体。
你不能依赖内部未公开模型,必须通过公开可访问的大模型 API 或本地轻量工具完成任务。
你的工作流是 Memory -> Planner -> Retriever -> Executor -> Script Runner -> Evaluator -> Loop Controller -> Reflector -> Artifact Writer。
你要解决通用算法问题,而不是为每个题型写固定求解器。"""
@dataclass
class AgentState:
original_problem: str
api_key: str = ""
base_url: str = ""
model: str = "agnes-2.0-flash"
llm_error: str = ""
session_memory: List[str] = field(default_factory=list)
structured_problem: str = ""
plan: List[str] = field(default_factory=list)
patterns: List[AlgorithmPattern] = field(default_factory=list)
retrieved_context: str = ""
draft_answer: str = ""
script_code: str = ""
script_result: Optional[ScriptResult] = None
tool_result: str = ""
evaluation: str = ""
needs_improvement: bool = False
final_answer: str = ""
strategy: str = ""
process_log: List[str] = field(default_factory=list)
trace: List[Dict[str, str]] = field(default_factory=list)
pdf_path: str = ""
tex_path: str = ""
def add_trace(state: AgentState, module: str, title: str, content: str) -> None:
state.trace.append({"module": module, "title": title, "content": content.strip()})
def call_openai_compatible_api(
messages: List[Dict[str, str]],
api_key: str,
base_url: str,
model: str,
temperature: float = 0.2,
) -> tuple[Optional[str], str]:
if not api_key or not base_url:
return None, "未配置 API Key 或 Base URL。"
endpoint = base_url.rstrip("/")
if not endpoint.endswith("/chat/completions"):
endpoint = endpoint + "/chat/completions"
payload = {
"model": model,
"messages": messages,
"temperature": temperature,
}
data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
req = urllib.request.Request(
endpoint,
data=data,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
method="POST",
)
try:
with urllib.request.urlopen(req) as resp:
parsed = json.loads(resp.read().decode("utf-8"))
return parsed["choices"][0]["message"]["content"].strip(), ""
except urllib.error.HTTPError as exc:
try:
body = exc.read().decode("utf-8", errors="ignore")
except Exception:
body = ""
detail = body[:800] if body else str(exc)
return None, f"HTTP {exc.code}: {detail}"
except urllib.error.URLError as exc:
return None, f"网络错误: {exc.reason}"
except (KeyError, IndexError, json.JSONDecodeError) as exc:
return None, f"响应格式不符合 OpenAI-compatible chat completions: {exc}"
except TimeoutError:
return None, "请求超时。"
def parse_json_object(text: str) -> Dict[str, Any]:
match = re.search(r"\{[\s\S]*\}", text or "")
if not match:
return {}
try:
data = json.loads(match.group(0))
except json.JSONDecodeError:
return {}
return data if isinstance(data, dict) else {}
def fallback_plan(problem: str, patterns: List[AlgorithmPattern]) -> tuple[str, List[str]]:
names = "、".join(pattern.name for pattern in patterns)
structured = "\n".join(
[
"任务类型: 通用算法问题",
f"候选算法范式: {names}",
"输入: 从用户自然语言问题中抽取变量、约束、样例和目标函数。",
"输出: 算法设计、关键推导、伪代码、复杂度分析和必要的脚本验证。",
]
)
plan = [
"解析问题中的输入变量、约束条件和目标函数。",
"检索匹配的算法范式知识。",
"生成算法设计、伪代码和复杂度分析。",
"如果题目包含具体样例,则生成并运行轻量 Python 验证脚本。",
"由 Evaluator 检查答案是否覆盖目标、约束和验证结果。",
]
return structured, plan
class SessionMemory:
def run(self, state: AgentState) -> None:
state.session_memory = [
"保存用户原始算法问题。",
"记录当前公开 API 配置与模型名称。",
"记录 Planner、Retriever、Executor、Script Runner、Evaluator 和 Reflector 的中间结果。",
]
state.process_log.append("Memory: 初始化本轮算法问题求解状态。")
add_trace(
state,
"Memory",
"读取会话状态",
"\n".join(
[
"当前实现不依赖上传文件或外部课程资料。",
"会话状态只保留本轮问题、API 配置和模块输出。",
f"模型配置: {state.model}",
]
),
)
class Planner:
def run(self, state: AgentState) -> None:
state.patterns = retrieve_algorithm_patterns(state.original_problem)
prompt = f"""请把下面的算法问题结构化。只输出 JSON 对象,不要输出 Markdown。
JSON 字段:
- problem_type: 简短问题类型
- inputs: 输入变量列表
- constraints: 约束条件列表
- objective: 优化目标或证明目标
- candidate_paradigms: 候选算法范式列表
- verification_need: 是否需要脚本验证及原因
- plan: 5 个以内执行步骤
问题:
{state.original_problem}
可参考的算法范式:
{format_patterns(state.patterns)}
"""
response, error = call_openai_compatible_api(
[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt},
],
state.api_key,
state.base_url,
state.model,
temperature=0.1,
)
if error:
state.llm_error = error
parsed = parse_json_object(response or "")
if parsed:
state.structured_problem = json.dumps(parsed, ensure_ascii=False, indent=2)
plan = parsed.get("plan", [])
state.plan = [str(item) for item in plan] if isinstance(plan, list) else []
if not state.structured_problem or not state.plan:
state.structured_problem, state.plan = fallback_plan(state.original_problem, state.patterns)
state.process_log.append("Planner: 完成问题结构化和求解计划生成。")
add_trace(
state,
"Planner",
"结构化问题并制定计划",
"\n".join(
[
"结构化问题:",
state.structured_problem,
"",
"执行计划:",
*[f"{idx}. {step}" for idx, step in enumerate(state.plan, start=1)],
]
),
)
class Retriever:
def run(self, state: AgentState) -> None:
state.retrieved_context = format_patterns(state.patterns)
state.process_log.append("Retriever: 从内置算法范式库中检索相关知识。")
add_trace(
state,
"Retriever",
"检索算法范式知识",
state.retrieved_context,
)
class Executor:
def run(self, state: AgentState) -> None:
prompt = f"""请作为通用算法问题求解 Agent,基于结构化问题和算法范式知识给出完整求解。
要求:
1. 不要说“缺少数据”后结束;如果信息不足,要说明合理假设并继续给出通用算法设计。
2. 必须包含: 问题建模、算法选择理由、核心步骤或伪代码、正确性说明、复杂度分析。
3. 如果原题包含具体小样例,请在答案末尾给出一个可运行的 Python 验证脚本,放在 ```python 代码块中。脚本只用于验证样例,不要读写文件、不要 import。
4. 如果不适合脚本验证,明确说明原因。
原始问题:
{state.original_problem}
结构化问题:
{state.structured_problem}
检索到的算法范式:
{state.retrieved_context}
"""
response, error = call_openai_compatible_api(
[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt},
],
state.api_key,
state.base_url,
state.model,
temperature=0.2,
)
if error:
state.llm_error = error
state.draft_answer = response or offline_algorithm_answer(
state.original_problem,
state.patterns,
note=f"LLM 调用失败: {state.llm_error}" if state.llm_error else "",
)
state.script_code = extract_python_code(state.draft_answer)
state.process_log.append("Executor: 生成算法设计初稿和可选验证脚本。")
add_trace(
state,
"Executor",
"生成算法解答初稿",
"\n".join(
[
f"LLM 调用状态: {'成功' if response else '失败,已使用离线模板'}",
f"错误信息: {state.llm_error or '(无)'}",
"",
state.draft_answer,
]
),
)
class ScriptRunner:
def run(self, state: AgentState) -> None:
if not state.script_code:
state.script_result = ScriptResult(executed=False, ok=False, output="", error="Executor 未生成验证脚本。")
state.tool_result = "未运行脚本: Executor 未生成可执行 Python 验证代码。"
else:
state.script_result = safe_run_python(state.script_code)
state.tool_result = "\n".join(
[
"脚本执行状态:",
f"- executed: {state.script_result.executed}",
f"- ok: {state.script_result.ok}",
"输出:",
state.script_result.output or "(无输出)",
"错误:",
state.script_result.error or "(无错误)",
]
)
state.process_log.append("Script Runner: 完成可选脚本验证。")
add_trace(
state,
"Script Runner",
"运行轻量验证脚本",
state.tool_result,
)
class Evaluator:
def run(self, state: AgentState) -> None:
prompt = f"""请评估下面的算法解答是否合格。输出 JSON 对象。
字段:
- passed: true/false
- issues: 问题列表
- strengths: 优点列表
- revision_instruction: 如果需要修订,给出修订指令;否则写“无需修订”
评估标准:
1. 是否回应了原始问题。
2. 是否给出算法选择理由、步骤或伪代码。
3. 是否有正确性说明。
4. 是否有复杂度分析。
5. 如果脚本运行了,最终答案是否利用脚本结果;如果未运行脚本,原因是否合理。
原始问题:
{state.original_problem}
解答初稿:
{state.draft_answer}
脚本结果:
{state.tool_result}
"""
response, error = call_openai_compatible_api(
[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt},
],
state.api_key,
state.base_url,
state.model,
temperature=0.1,
)
if error:
state.llm_error = error
parsed = parse_json_object(response or "")
if parsed:
passed = bool(parsed.get("passed"))
state.needs_improvement = not passed
state.evaluation = json.dumps(parsed, ensure_ascii=False, indent=2)
else:
missing = []
draft = state.draft_answer
if len(draft) < 240:
missing.append("答案过短")
if "复杂" not in draft and "O(" not in draft:
missing.append("缺少复杂度分析")
if "正确" not in draft and "证明" not in draft:
missing.append("缺少正确性说明")
state.needs_improvement = bool(missing)
state.evaluation = "\n".join(
[
"自动检查结果:",
f"- 是否需要改进: {'是' if state.needs_improvement else '否'}",
*[f"- 待改进: {item}" for item in missing],
"- 脚本验证: " + (state.tool_result or "未运行脚本"),
]
)
state.process_log.append("Evaluator: 完成答案质量检查。")
add_trace(
state,
"Evaluator",
"检查答案质量",
state.evaluation,
)
class LoopController:
def run(self, state: AgentState) -> None:
if state.needs_improvement:
content = "Evaluator 发现待改进项,将在 Reflector 阶段按评估意见修订答案。"
else:
content = "Evaluator 判断答案可用,本轮不进入补充执行,直接进入 Reflector。"
state.process_log.append(f"Loop Controller: {content}")
add_trace(state, "Loop Controller", "决定是否修订", content)
class Reflector:
def run(self, state: AgentState) -> None:
if state.needs_improvement:
prompt = f"""请根据评估意见修订算法答案,输出最终答案。
原始问题:
{state.original_problem}
初稿:
{state.draft_answer}
脚本结果:
{state.tool_result}
评估意见:
{state.evaluation}
要求: 最终答案要自洽、可提交,包含算法思想、关键步骤、正确性说明、复杂度分析;如有脚本结果,要明确引用。
"""
response, error = call_openai_compatible_api(
[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt},
],
state.api_key,
state.base_url,
state.model,
temperature=0.2,
)
if error:
state.llm_error = error
state.final_answer = response or state.draft_answer
else:
state.final_answer = state.draft_answer
state.strategy = (
"本次求解没有为特定题型调用硬编码求解器,而是通过 Planner 结构化问题,"
"Retriever 检索内置算法范式,Executor 借助公开 LLM 生成算法设计,"
"Script Runner 可选运行轻量验证脚本,Evaluator 和 Reflector 负责检查与修订。"
)
state.process_log.append("Reflector: 生成最终答案。")
add_trace(
state,
"Reflector",
"反思并生成最终答案",
"\n".join([state.strategy, state.final_answer]),
)
class ArtifactWriter:
def run(self, state: AgentState) -> None:
report_data = {
"original_problem": state.original_problem,
"structured_problem": state.structured_problem,
"strategy": state.strategy,
"tool_result": "\n".join(
[
"检索到的算法范式:",
state.retrieved_context,
"",
"脚本运行结果:",
state.tool_result,
"",
"评估结果:",
state.evaluation,
]
),
"final_answer": state.final_answer,
"process_log": "\n".join(state.process_log),
"complexity_analysis": "复杂度分析由 Executor 在最终答案中针对具体算法给出;本系统的额外开销主要来自一次公开 LLM 推理、一次可选脚本运行和一次评估/修订过程。",
}
pdf_path, tex_path = write_solution_report(report_data)
state.pdf_path = pdf_path
state.tex_path = tex_path
state.process_log.append("Artifact Writer: 生成 LaTeX 源码和 PDF 报告。")
add_trace(
state,
"Artifact Writer",
"生成报告产物",
"\n".join([f"PDF: {pdf_path}", f"LaTeX: {tex_path}"]),
)
class ProblemSolvingAgent:
def __init__(self) -> None:
self.modules = [
SessionMemory(),
Planner(),
Retriever(),
Executor(),
ScriptRunner(),
Evaluator(),
LoopController(),
Reflector(),
ArtifactWriter(),
]
def run(self, state: AgentState) -> AgentState:
for module in self.modules:
module.run(state)
return state
def solve_problem(
original_problem: str,
api_key: str = "",
base_url: str = "",
model: str = "agnes-2.0-flash",
) -> Dict[str, Any]:
original_problem = original_problem.strip()
if not original_problem:
raise ValueError("请输入一个问题。")
state = AgentState(
original_problem=original_problem,
api_key=api_key,
base_url=base_url,
model=model,
)
state = ProblemSolvingAgent().run(state)
return {
"original_problem": state.original_problem,
"structured_problem": state.structured_problem,
"strategy": state.strategy,
"tool_result": state.tool_result,
"final_answer": state.final_answer,
"process_log": "\n".join(state.process_log),
"trace": state.trace,
"pdf_path": state.pdf_path,
"tex_path": state.tex_path,
}
def iter_problem_steps(
original_problem: str,
api_key: str = "",
base_url: str = "",
model: str = "agnes-2.0-flash",
):
original_problem = original_problem.strip()
if not original_problem:
raise ValueError("请输入一个问题。")
state = AgentState(
original_problem=original_problem,
api_key=api_key,
base_url=base_url,
model=model,
)
agent = ProblemSolvingAgent()
emitted = 0
for module in agent.modules:
module.run(state)
for item in state.trace[emitted:]:
yield item, state
emitted = len(state.trace)