Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import json | |
| import re | |
| import urllib.error | |
| import urllib.request | |
| from dataclasses import dataclass, field | |
| from typing import Any, Dict, List, Optional | |
| from reporting import write_solution_report | |
| from tools import ( | |
| AlgorithmPattern, | |
| ScriptResult, | |
| extract_python_code, | |
| format_patterns, | |
| offline_algorithm_answer, | |
| retrieve_algorithm_patterns, | |
| safe_run_python, | |
| ) | |
| SYSTEM_PROMPT = """你是一个通用算法问题优化与求解智能体。 | |
| 你不能依赖内部未公开模型,必须通过公开可访问的大模型 API 或本地轻量工具完成任务。 | |
| 你的工作流是 Memory -> Planner -> Retriever -> Executor -> Script Runner -> Evaluator -> Loop Controller -> Reflector -> Artifact Writer。 | |
| 你要解决通用算法问题,而不是为每个题型写固定求解器。""" | |
| class AgentState: | |
| original_problem: str | |
| api_key: str = "" | |
| base_url: str = "" | |
| model: str = "agnes-2.0-flash" | |
| llm_error: str = "" | |
| session_memory: List[str] = field(default_factory=list) | |
| structured_problem: str = "" | |
| plan: List[str] = field(default_factory=list) | |
| patterns: List[AlgorithmPattern] = field(default_factory=list) | |
| retrieved_context: str = "" | |
| draft_answer: str = "" | |
| script_code: str = "" | |
| script_result: Optional[ScriptResult] = None | |
| tool_result: str = "" | |
| evaluation: str = "" | |
| needs_improvement: bool = False | |
| final_answer: str = "" | |
| strategy: str = "" | |
| process_log: List[str] = field(default_factory=list) | |
| trace: List[Dict[str, str]] = field(default_factory=list) | |
| pdf_path: str = "" | |
| tex_path: str = "" | |
| def add_trace(state: AgentState, module: str, title: str, content: str) -> None: | |
| state.trace.append({"module": module, "title": title, "content": content.strip()}) | |
| def call_openai_compatible_api( | |
| messages: List[Dict[str, str]], | |
| api_key: str, | |
| base_url: str, | |
| model: str, | |
| temperature: float = 0.2, | |
| ) -> tuple[Optional[str], str]: | |
| if not api_key or not base_url: | |
| return None, "未配置 API Key 或 Base URL。" | |
| endpoint = base_url.rstrip("/") | |
| if not endpoint.endswith("/chat/completions"): | |
| endpoint = endpoint + "/chat/completions" | |
| payload = { | |
| "model": model, | |
| "messages": messages, | |
| "temperature": temperature, | |
| } | |
| data = json.dumps(payload, ensure_ascii=False).encode("utf-8") | |
| req = urllib.request.Request( | |
| endpoint, | |
| data=data, | |
| headers={ | |
| "Authorization": f"Bearer {api_key}", | |
| "Content-Type": "application/json", | |
| }, | |
| method="POST", | |
| ) | |
| try: | |
| with urllib.request.urlopen(req) as resp: | |
| parsed = json.loads(resp.read().decode("utf-8")) | |
| return parsed["choices"][0]["message"]["content"].strip(), "" | |
| except urllib.error.HTTPError as exc: | |
| try: | |
| body = exc.read().decode("utf-8", errors="ignore") | |
| except Exception: | |
| body = "" | |
| detail = body[:800] if body else str(exc) | |
| return None, f"HTTP {exc.code}: {detail}" | |
| except urllib.error.URLError as exc: | |
| return None, f"网络错误: {exc.reason}" | |
| except (KeyError, IndexError, json.JSONDecodeError) as exc: | |
| return None, f"响应格式不符合 OpenAI-compatible chat completions: {exc}" | |
| except TimeoutError: | |
| return None, "请求超时。" | |
| def parse_json_object(text: str) -> Dict[str, Any]: | |
| match = re.search(r"\{[\s\S]*\}", text or "") | |
| if not match: | |
| return {} | |
| try: | |
| data = json.loads(match.group(0)) | |
| except json.JSONDecodeError: | |
| return {} | |
| return data if isinstance(data, dict) else {} | |
| def fallback_plan(problem: str, patterns: List[AlgorithmPattern]) -> tuple[str, List[str]]: | |
| names = "、".join(pattern.name for pattern in patterns) | |
| structured = "\n".join( | |
| [ | |
| "任务类型: 通用算法问题", | |
| f"候选算法范式: {names}", | |
| "输入: 从用户自然语言问题中抽取变量、约束、样例和目标函数。", | |
| "输出: 算法设计、关键推导、伪代码、复杂度分析和必要的脚本验证。", | |
| ] | |
| ) | |
| plan = [ | |
| "解析问题中的输入变量、约束条件和目标函数。", | |
| "检索匹配的算法范式知识。", | |
| "生成算法设计、伪代码和复杂度分析。", | |
| "如果题目包含具体样例,则生成并运行轻量 Python 验证脚本。", | |
| "由 Evaluator 检查答案是否覆盖目标、约束和验证结果。", | |
| ] | |
| return structured, plan | |
| class SessionMemory: | |
| def run(self, state: AgentState) -> None: | |
| state.session_memory = [ | |
| "保存用户原始算法问题。", | |
| "记录当前公开 API 配置与模型名称。", | |
| "记录 Planner、Retriever、Executor、Script Runner、Evaluator 和 Reflector 的中间结果。", | |
| ] | |
| state.process_log.append("Memory: 初始化本轮算法问题求解状态。") | |
| add_trace( | |
| state, | |
| "Memory", | |
| "读取会话状态", | |
| "\n".join( | |
| [ | |
| "当前实现不依赖上传文件或外部课程资料。", | |
| "会话状态只保留本轮问题、API 配置和模块输出。", | |
| f"模型配置: {state.model}", | |
| ] | |
| ), | |
| ) | |
| class Planner: | |
| def run(self, state: AgentState) -> None: | |
| state.patterns = retrieve_algorithm_patterns(state.original_problem) | |
| prompt = f"""请把下面的算法问题结构化。只输出 JSON 对象,不要输出 Markdown。 | |
| JSON 字段: | |
| - problem_type: 简短问题类型 | |
| - inputs: 输入变量列表 | |
| - constraints: 约束条件列表 | |
| - objective: 优化目标或证明目标 | |
| - candidate_paradigms: 候选算法范式列表 | |
| - verification_need: 是否需要脚本验证及原因 | |
| - plan: 5 个以内执行步骤 | |
| 问题: | |
| {state.original_problem} | |
| 可参考的算法范式: | |
| {format_patterns(state.patterns)} | |
| """ | |
| response, error = call_openai_compatible_api( | |
| [ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": prompt}, | |
| ], | |
| state.api_key, | |
| state.base_url, | |
| state.model, | |
| temperature=0.1, | |
| ) | |
| if error: | |
| state.llm_error = error | |
| parsed = parse_json_object(response or "") | |
| if parsed: | |
| state.structured_problem = json.dumps(parsed, ensure_ascii=False, indent=2) | |
| plan = parsed.get("plan", []) | |
| state.plan = [str(item) for item in plan] if isinstance(plan, list) else [] | |
| if not state.structured_problem or not state.plan: | |
| state.structured_problem, state.plan = fallback_plan(state.original_problem, state.patterns) | |
| state.process_log.append("Planner: 完成问题结构化和求解计划生成。") | |
| add_trace( | |
| state, | |
| "Planner", | |
| "结构化问题并制定计划", | |
| "\n".join( | |
| [ | |
| "结构化问题:", | |
| state.structured_problem, | |
| "", | |
| "执行计划:", | |
| *[f"{idx}. {step}" for idx, step in enumerate(state.plan, start=1)], | |
| ] | |
| ), | |
| ) | |
| class Retriever: | |
| def run(self, state: AgentState) -> None: | |
| state.retrieved_context = format_patterns(state.patterns) | |
| state.process_log.append("Retriever: 从内置算法范式库中检索相关知识。") | |
| add_trace( | |
| state, | |
| "Retriever", | |
| "检索算法范式知识", | |
| state.retrieved_context, | |
| ) | |
| class Executor: | |
| def run(self, state: AgentState) -> None: | |
| prompt = f"""请作为通用算法问题求解 Agent,基于结构化问题和算法范式知识给出完整求解。 | |
| 要求: | |
| 1. 不要说“缺少数据”后结束;如果信息不足,要说明合理假设并继续给出通用算法设计。 | |
| 2. 必须包含: 问题建模、算法选择理由、核心步骤或伪代码、正确性说明、复杂度分析。 | |
| 3. 如果原题包含具体小样例,请在答案末尾给出一个可运行的 Python 验证脚本,放在 ```python 代码块中。脚本只用于验证样例,不要读写文件、不要 import。 | |
| 4. 如果不适合脚本验证,明确说明原因。 | |
| 原始问题: | |
| {state.original_problem} | |
| 结构化问题: | |
| {state.structured_problem} | |
| 检索到的算法范式: | |
| {state.retrieved_context} | |
| """ | |
| response, error = call_openai_compatible_api( | |
| [ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": prompt}, | |
| ], | |
| state.api_key, | |
| state.base_url, | |
| state.model, | |
| temperature=0.2, | |
| ) | |
| if error: | |
| state.llm_error = error | |
| state.draft_answer = response or offline_algorithm_answer( | |
| state.original_problem, | |
| state.patterns, | |
| note=f"LLM 调用失败: {state.llm_error}" if state.llm_error else "", | |
| ) | |
| state.script_code = extract_python_code(state.draft_answer) | |
| state.process_log.append("Executor: 生成算法设计初稿和可选验证脚本。") | |
| add_trace( | |
| state, | |
| "Executor", | |
| "生成算法解答初稿", | |
| "\n".join( | |
| [ | |
| f"LLM 调用状态: {'成功' if response else '失败,已使用离线模板'}", | |
| f"错误信息: {state.llm_error or '(无)'}", | |
| "", | |
| state.draft_answer, | |
| ] | |
| ), | |
| ) | |
| class ScriptRunner: | |
| def run(self, state: AgentState) -> None: | |
| if not state.script_code: | |
| state.script_result = ScriptResult(executed=False, ok=False, output="", error="Executor 未生成验证脚本。") | |
| state.tool_result = "未运行脚本: Executor 未生成可执行 Python 验证代码。" | |
| else: | |
| state.script_result = safe_run_python(state.script_code) | |
| state.tool_result = "\n".join( | |
| [ | |
| "脚本执行状态:", | |
| f"- executed: {state.script_result.executed}", | |
| f"- ok: {state.script_result.ok}", | |
| "输出:", | |
| state.script_result.output or "(无输出)", | |
| "错误:", | |
| state.script_result.error or "(无错误)", | |
| ] | |
| ) | |
| state.process_log.append("Script Runner: 完成可选脚本验证。") | |
| add_trace( | |
| state, | |
| "Script Runner", | |
| "运行轻量验证脚本", | |
| state.tool_result, | |
| ) | |
| class Evaluator: | |
| def run(self, state: AgentState) -> None: | |
| prompt = f"""请评估下面的算法解答是否合格。输出 JSON 对象。 | |
| 字段: | |
| - passed: true/false | |
| - issues: 问题列表 | |
| - strengths: 优点列表 | |
| - revision_instruction: 如果需要修订,给出修订指令;否则写“无需修订” | |
| 评估标准: | |
| 1. 是否回应了原始问题。 | |
| 2. 是否给出算法选择理由、步骤或伪代码。 | |
| 3. 是否有正确性说明。 | |
| 4. 是否有复杂度分析。 | |
| 5. 如果脚本运行了,最终答案是否利用脚本结果;如果未运行脚本,原因是否合理。 | |
| 原始问题: | |
| {state.original_problem} | |
| 解答初稿: | |
| {state.draft_answer} | |
| 脚本结果: | |
| {state.tool_result} | |
| """ | |
| response, error = call_openai_compatible_api( | |
| [ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": prompt}, | |
| ], | |
| state.api_key, | |
| state.base_url, | |
| state.model, | |
| temperature=0.1, | |
| ) | |
| if error: | |
| state.llm_error = error | |
| parsed = parse_json_object(response or "") | |
| if parsed: | |
| passed = bool(parsed.get("passed")) | |
| state.needs_improvement = not passed | |
| state.evaluation = json.dumps(parsed, ensure_ascii=False, indent=2) | |
| else: | |
| missing = [] | |
| draft = state.draft_answer | |
| if len(draft) < 240: | |
| missing.append("答案过短") | |
| if "复杂" not in draft and "O(" not in draft: | |
| missing.append("缺少复杂度分析") | |
| if "正确" not in draft and "证明" not in draft: | |
| missing.append("缺少正确性说明") | |
| state.needs_improvement = bool(missing) | |
| state.evaluation = "\n".join( | |
| [ | |
| "自动检查结果:", | |
| f"- 是否需要改进: {'是' if state.needs_improvement else '否'}", | |
| *[f"- 待改进: {item}" for item in missing], | |
| "- 脚本验证: " + (state.tool_result or "未运行脚本"), | |
| ] | |
| ) | |
| state.process_log.append("Evaluator: 完成答案质量检查。") | |
| add_trace( | |
| state, | |
| "Evaluator", | |
| "检查答案质量", | |
| state.evaluation, | |
| ) | |
| class LoopController: | |
| def run(self, state: AgentState) -> None: | |
| if state.needs_improvement: | |
| content = "Evaluator 发现待改进项,将在 Reflector 阶段按评估意见修订答案。" | |
| else: | |
| content = "Evaluator 判断答案可用,本轮不进入补充执行,直接进入 Reflector。" | |
| state.process_log.append(f"Loop Controller: {content}") | |
| add_trace(state, "Loop Controller", "决定是否修订", content) | |
| class Reflector: | |
| def run(self, state: AgentState) -> None: | |
| if state.needs_improvement: | |
| prompt = f"""请根据评估意见修订算法答案,输出最终答案。 | |
| 原始问题: | |
| {state.original_problem} | |
| 初稿: | |
| {state.draft_answer} | |
| 脚本结果: | |
| {state.tool_result} | |
| 评估意见: | |
| {state.evaluation} | |
| 要求: 最终答案要自洽、可提交,包含算法思想、关键步骤、正确性说明、复杂度分析;如有脚本结果,要明确引用。 | |
| """ | |
| response, error = call_openai_compatible_api( | |
| [ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": prompt}, | |
| ], | |
| state.api_key, | |
| state.base_url, | |
| state.model, | |
| temperature=0.2, | |
| ) | |
| if error: | |
| state.llm_error = error | |
| state.final_answer = response or state.draft_answer | |
| else: | |
| state.final_answer = state.draft_answer | |
| state.strategy = ( | |
| "本次求解没有为特定题型调用硬编码求解器,而是通过 Planner 结构化问题," | |
| "Retriever 检索内置算法范式,Executor 借助公开 LLM 生成算法设计," | |
| "Script Runner 可选运行轻量验证脚本,Evaluator 和 Reflector 负责检查与修订。" | |
| ) | |
| state.process_log.append("Reflector: 生成最终答案。") | |
| add_trace( | |
| state, | |
| "Reflector", | |
| "反思并生成最终答案", | |
| "\n".join([state.strategy, state.final_answer]), | |
| ) | |
| class ArtifactWriter: | |
| def run(self, state: AgentState) -> None: | |
| report_data = { | |
| "original_problem": state.original_problem, | |
| "structured_problem": state.structured_problem, | |
| "strategy": state.strategy, | |
| "tool_result": "\n".join( | |
| [ | |
| "检索到的算法范式:", | |
| state.retrieved_context, | |
| "", | |
| "脚本运行结果:", | |
| state.tool_result, | |
| "", | |
| "评估结果:", | |
| state.evaluation, | |
| ] | |
| ), | |
| "final_answer": state.final_answer, | |
| "process_log": "\n".join(state.process_log), | |
| "complexity_analysis": "复杂度分析由 Executor 在最终答案中针对具体算法给出;本系统的额外开销主要来自一次公开 LLM 推理、一次可选脚本运行和一次评估/修订过程。", | |
| } | |
| pdf_path, tex_path = write_solution_report(report_data) | |
| state.pdf_path = pdf_path | |
| state.tex_path = tex_path | |
| state.process_log.append("Artifact Writer: 生成 LaTeX 源码和 PDF 报告。") | |
| add_trace( | |
| state, | |
| "Artifact Writer", | |
| "生成报告产物", | |
| "\n".join([f"PDF: {pdf_path}", f"LaTeX: {tex_path}"]), | |
| ) | |
| class ProblemSolvingAgent: | |
| def __init__(self) -> None: | |
| self.modules = [ | |
| SessionMemory(), | |
| Planner(), | |
| Retriever(), | |
| Executor(), | |
| ScriptRunner(), | |
| Evaluator(), | |
| LoopController(), | |
| Reflector(), | |
| ArtifactWriter(), | |
| ] | |
| def run(self, state: AgentState) -> AgentState: | |
| for module in self.modules: | |
| module.run(state) | |
| return state | |
| def solve_problem( | |
| original_problem: str, | |
| api_key: str = "", | |
| base_url: str = "", | |
| model: str = "agnes-2.0-flash", | |
| ) -> Dict[str, Any]: | |
| original_problem = original_problem.strip() | |
| if not original_problem: | |
| raise ValueError("请输入一个问题。") | |
| state = AgentState( | |
| original_problem=original_problem, | |
| api_key=api_key, | |
| base_url=base_url, | |
| model=model, | |
| ) | |
| state = ProblemSolvingAgent().run(state) | |
| return { | |
| "original_problem": state.original_problem, | |
| "structured_problem": state.structured_problem, | |
| "strategy": state.strategy, | |
| "tool_result": state.tool_result, | |
| "final_answer": state.final_answer, | |
| "process_log": "\n".join(state.process_log), | |
| "trace": state.trace, | |
| "pdf_path": state.pdf_path, | |
| "tex_path": state.tex_path, | |
| } | |
| def iter_problem_steps( | |
| original_problem: str, | |
| api_key: str = "", | |
| base_url: str = "", | |
| model: str = "agnes-2.0-flash", | |
| ): | |
| original_problem = original_problem.strip() | |
| if not original_problem: | |
| raise ValueError("请输入一个问题。") | |
| state = AgentState( | |
| original_problem=original_problem, | |
| api_key=api_key, | |
| base_url=base_url, | |
| model=model, | |
| ) | |
| agent = ProblemSolvingAgent() | |
| emitted = 0 | |
| for module in agent.modules: | |
| module.run(state) | |
| for item in state.trace[emitted:]: | |
| yield item, state | |
| emitted = len(state.trace) | |