File size: 18,782 Bytes
ae0a268
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
from __future__ import annotations

import json
import re
import urllib.error
import urllib.request
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional

from reporting import write_solution_report
from tools import (
    AlgorithmPattern,
    ScriptResult,
    extract_python_code,
    format_patterns,
    offline_algorithm_answer,
    retrieve_algorithm_patterns,
    safe_run_python,
)


SYSTEM_PROMPT = """你是一个通用算法问题优化与求解智能体。
你不能依赖内部未公开模型,必须通过公开可访问的大模型 API 或本地轻量工具完成任务。
你的工作流是 Memory -> Planner -> Retriever -> Executor -> Script Runner -> Evaluator -> Loop Controller -> Reflector -> Artifact Writer。
你要解决通用算法问题,而不是为每个题型写固定求解器。"""


@dataclass
class AgentState:
    original_problem: str
    api_key: str = ""
    base_url: str = ""
    model: str = "agnes-2.0-flash"
    llm_error: str = ""
    session_memory: List[str] = field(default_factory=list)
    structured_problem: str = ""
    plan: List[str] = field(default_factory=list)
    patterns: List[AlgorithmPattern] = field(default_factory=list)
    retrieved_context: str = ""
    draft_answer: str = ""
    script_code: str = ""
    script_result: Optional[ScriptResult] = None
    tool_result: str = ""
    evaluation: str = ""
    needs_improvement: bool = False
    final_answer: str = ""
    strategy: str = ""
    process_log: List[str] = field(default_factory=list)
    trace: List[Dict[str, str]] = field(default_factory=list)
    pdf_path: str = ""
    tex_path: str = ""


def add_trace(state: AgentState, module: str, title: str, content: str) -> None:
    state.trace.append({"module": module, "title": title, "content": content.strip()})


def call_openai_compatible_api(
    messages: List[Dict[str, str]],
    api_key: str,
    base_url: str,
    model: str,
    temperature: float = 0.2,
) -> tuple[Optional[str], str]:
    if not api_key or not base_url:
        return None, "未配置 API Key 或 Base URL。"

    endpoint = base_url.rstrip("/")
    if not endpoint.endswith("/chat/completions"):
        endpoint = endpoint + "/chat/completions"

    payload = {
        "model": model,
        "messages": messages,
        "temperature": temperature,
    }
    data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
    req = urllib.request.Request(
        endpoint,
        data=data,
        headers={
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json",
        },
        method="POST",
    )
    try:
        with urllib.request.urlopen(req) as resp:
            parsed = json.loads(resp.read().decode("utf-8"))
        return parsed["choices"][0]["message"]["content"].strip(), ""
    except urllib.error.HTTPError as exc:
        try:
            body = exc.read().decode("utf-8", errors="ignore")
        except Exception:
            body = ""
        detail = body[:800] if body else str(exc)
        return None, f"HTTP {exc.code}: {detail}"
    except urllib.error.URLError as exc:
        return None, f"网络错误: {exc.reason}"
    except (KeyError, IndexError, json.JSONDecodeError) as exc:
        return None, f"响应格式不符合 OpenAI-compatible chat completions: {exc}"
    except TimeoutError:
        return None, "请求超时。"


def parse_json_object(text: str) -> Dict[str, Any]:
    match = re.search(r"\{[\s\S]*\}", text or "")
    if not match:
        return {}
    try:
        data = json.loads(match.group(0))
    except json.JSONDecodeError:
        return {}
    return data if isinstance(data, dict) else {}


def fallback_plan(problem: str, patterns: List[AlgorithmPattern]) -> tuple[str, List[str]]:
    names = "、".join(pattern.name for pattern in patterns)
    structured = "\n".join(
        [
            "任务类型: 通用算法问题",
            f"候选算法范式: {names}",
            "输入: 从用户自然语言问题中抽取变量、约束、样例和目标函数。",
            "输出: 算法设计、关键推导、伪代码、复杂度分析和必要的脚本验证。",
        ]
    )
    plan = [
        "解析问题中的输入变量、约束条件和目标函数。",
        "检索匹配的算法范式知识。",
        "生成算法设计、伪代码和复杂度分析。",
        "如果题目包含具体样例,则生成并运行轻量 Python 验证脚本。",
        "由 Evaluator 检查答案是否覆盖目标、约束和验证结果。",
    ]
    return structured, plan


class SessionMemory:
    def run(self, state: AgentState) -> None:
        state.session_memory = [
            "保存用户原始算法问题。",
            "记录当前公开 API 配置与模型名称。",
            "记录 Planner、Retriever、Executor、Script Runner、Evaluator 和 Reflector 的中间结果。",
        ]
        state.process_log.append("Memory: 初始化本轮算法问题求解状态。")
        add_trace(
            state,
            "Memory",
            "读取会话状态",
            "\n".join(
                [
                    "当前实现不依赖上传文件或外部课程资料。",
                    "会话状态只保留本轮问题、API 配置和模块输出。",
                    f"模型配置: {state.model}",
                ]
            ),
        )


class Planner:
    def run(self, state: AgentState) -> None:
        state.patterns = retrieve_algorithm_patterns(state.original_problem)
        prompt = f"""请把下面的算法问题结构化。只输出 JSON 对象,不要输出 Markdown。

JSON 字段:
- problem_type: 简短问题类型
- inputs: 输入变量列表
- constraints: 约束条件列表
- objective: 优化目标或证明目标
- candidate_paradigms: 候选算法范式列表
- verification_need: 是否需要脚本验证及原因
- plan: 5 个以内执行步骤

问题:
{state.original_problem}

可参考的算法范式:
{format_patterns(state.patterns)}
"""
        response, error = call_openai_compatible_api(
            [
                {"role": "system", "content": SYSTEM_PROMPT},
                {"role": "user", "content": prompt},
            ],
            state.api_key,
            state.base_url,
            state.model,
            temperature=0.1,
        )
        if error:
            state.llm_error = error
        parsed = parse_json_object(response or "")
        if parsed:
            state.structured_problem = json.dumps(parsed, ensure_ascii=False, indent=2)
            plan = parsed.get("plan", [])
            state.plan = [str(item) for item in plan] if isinstance(plan, list) else []
        if not state.structured_problem or not state.plan:
            state.structured_problem, state.plan = fallback_plan(state.original_problem, state.patterns)

        state.process_log.append("Planner: 完成问题结构化和求解计划生成。")
        add_trace(
            state,
            "Planner",
            "结构化问题并制定计划",
            "\n".join(
                [
                    "结构化问题:",
                    state.structured_problem,
                    "",
                    "执行计划:",
                    *[f"{idx}. {step}" for idx, step in enumerate(state.plan, start=1)],
                ]
            ),
        )


class Retriever:
    def run(self, state: AgentState) -> None:
        state.retrieved_context = format_patterns(state.patterns)
        state.process_log.append("Retriever: 从内置算法范式库中检索相关知识。")
        add_trace(
            state,
            "Retriever",
            "检索算法范式知识",
            state.retrieved_context,
        )


class Executor:
    def run(self, state: AgentState) -> None:
        prompt = f"""请作为通用算法问题求解 Agent,基于结构化问题和算法范式知识给出完整求解。

要求:
1. 不要说“缺少数据”后结束;如果信息不足,要说明合理假设并继续给出通用算法设计。
2. 必须包含: 问题建模、算法选择理由、核心步骤或伪代码、正确性说明、复杂度分析。
3. 如果原题包含具体小样例,请在答案末尾给出一个可运行的 Python 验证脚本,放在 ```python 代码块中。脚本只用于验证样例,不要读写文件、不要 import。
4. 如果不适合脚本验证,明确说明原因。

原始问题:
{state.original_problem}

结构化问题:
{state.structured_problem}

检索到的算法范式:
{state.retrieved_context}
"""
        response, error = call_openai_compatible_api(
            [
                {"role": "system", "content": SYSTEM_PROMPT},
                {"role": "user", "content": prompt},
            ],
            state.api_key,
            state.base_url,
            state.model,
            temperature=0.2,
        )
        if error:
            state.llm_error = error
        state.draft_answer = response or offline_algorithm_answer(
            state.original_problem,
            state.patterns,
            note=f"LLM 调用失败: {state.llm_error}" if state.llm_error else "",
        )
        state.script_code = extract_python_code(state.draft_answer)
        state.process_log.append("Executor: 生成算法设计初稿和可选验证脚本。")
        add_trace(
            state,
            "Executor",
            "生成算法解答初稿",
            "\n".join(
                [
                    f"LLM 调用状态: {'成功' if response else '失败,已使用离线模板'}",
                    f"错误信息: {state.llm_error or '(无)'}",
                    "",
                    state.draft_answer,
                ]
            ),
        )


class ScriptRunner:
    def run(self, state: AgentState) -> None:
        if not state.script_code:
            state.script_result = ScriptResult(executed=False, ok=False, output="", error="Executor 未生成验证脚本。")
            state.tool_result = "未运行脚本: Executor 未生成可执行 Python 验证代码。"
        else:
            state.script_result = safe_run_python(state.script_code)
            state.tool_result = "\n".join(
                [
                    "脚本执行状态:",
                    f"- executed: {state.script_result.executed}",
                    f"- ok: {state.script_result.ok}",
                    "输出:",
                    state.script_result.output or "(无输出)",
                    "错误:",
                    state.script_result.error or "(无错误)",
                ]
            )
        state.process_log.append("Script Runner: 完成可选脚本验证。")
        add_trace(
            state,
            "Script Runner",
            "运行轻量验证脚本",
            state.tool_result,
        )


class Evaluator:
    def run(self, state: AgentState) -> None:
        prompt = f"""请评估下面的算法解答是否合格。输出 JSON 对象。

字段:
- passed: true/false
- issues: 问题列表
- strengths: 优点列表
- revision_instruction: 如果需要修订,给出修订指令;否则写“无需修订”

评估标准:
1. 是否回应了原始问题。
2. 是否给出算法选择理由、步骤或伪代码。
3. 是否有正确性说明。
4. 是否有复杂度分析。
5. 如果脚本运行了,最终答案是否利用脚本结果;如果未运行脚本,原因是否合理。

原始问题:
{state.original_problem}

解答初稿:
{state.draft_answer}

脚本结果:
{state.tool_result}
"""
        response, error = call_openai_compatible_api(
            [
                {"role": "system", "content": SYSTEM_PROMPT},
                {"role": "user", "content": prompt},
            ],
            state.api_key,
            state.base_url,
            state.model,
            temperature=0.1,
        )
        if error:
            state.llm_error = error
        parsed = parse_json_object(response or "")
        if parsed:
            passed = bool(parsed.get("passed"))
            state.needs_improvement = not passed
            state.evaluation = json.dumps(parsed, ensure_ascii=False, indent=2)
        else:
            missing = []
            draft = state.draft_answer
            if len(draft) < 240:
                missing.append("答案过短")
            if "复杂" not in draft and "O(" not in draft:
                missing.append("缺少复杂度分析")
            if "正确" not in draft and "证明" not in draft:
                missing.append("缺少正确性说明")
            state.needs_improvement = bool(missing)
            state.evaluation = "\n".join(
                [
                    "自动检查结果:",
                    f"- 是否需要改进: {'是' if state.needs_improvement else '否'}",
                    *[f"- 待改进: {item}" for item in missing],
                    "- 脚本验证: " + (state.tool_result or "未运行脚本"),
                ]
            )
        state.process_log.append("Evaluator: 完成答案质量检查。")
        add_trace(
            state,
            "Evaluator",
            "检查答案质量",
            state.evaluation,
        )


class LoopController:
    def run(self, state: AgentState) -> None:
        if state.needs_improvement:
            content = "Evaluator 发现待改进项,将在 Reflector 阶段按评估意见修订答案。"
        else:
            content = "Evaluator 判断答案可用,本轮不进入补充执行,直接进入 Reflector。"
        state.process_log.append(f"Loop Controller: {content}")
        add_trace(state, "Loop Controller", "决定是否修订", content)


class Reflector:
    def run(self, state: AgentState) -> None:
        if state.needs_improvement:
            prompt = f"""请根据评估意见修订算法答案,输出最终答案。

原始问题:
{state.original_problem}

初稿:
{state.draft_answer}

脚本结果:
{state.tool_result}

评估意见:
{state.evaluation}

要求: 最终答案要自洽、可提交,包含算法思想、关键步骤、正确性说明、复杂度分析;如有脚本结果,要明确引用。
"""
            response, error = call_openai_compatible_api(
                [
                    {"role": "system", "content": SYSTEM_PROMPT},
                    {"role": "user", "content": prompt},
                ],
                state.api_key,
                state.base_url,
                state.model,
                temperature=0.2,
            )
            if error:
                state.llm_error = error
            state.final_answer = response or state.draft_answer
        else:
            state.final_answer = state.draft_answer

        state.strategy = (
            "本次求解没有为特定题型调用硬编码求解器,而是通过 Planner 结构化问题,"
            "Retriever 检索内置算法范式,Executor 借助公开 LLM 生成算法设计,"
            "Script Runner 可选运行轻量验证脚本,Evaluator 和 Reflector 负责检查与修订。"
        )
        state.process_log.append("Reflector: 生成最终答案。")
        add_trace(
            state,
            "Reflector",
            "反思并生成最终答案",
            "\n".join([state.strategy, state.final_answer]),
        )


class ArtifactWriter:
    def run(self, state: AgentState) -> None:
        report_data = {
            "original_problem": state.original_problem,
            "structured_problem": state.structured_problem,
            "strategy": state.strategy,
            "tool_result": "\n".join(
                [
                    "检索到的算法范式:",
                    state.retrieved_context,
                    "",
                    "脚本运行结果:",
                    state.tool_result,
                    "",
                    "评估结果:",
                    state.evaluation,
                ]
            ),
            "final_answer": state.final_answer,
            "process_log": "\n".join(state.process_log),
            "complexity_analysis": "复杂度分析由 Executor 在最终答案中针对具体算法给出;本系统的额外开销主要来自一次公开 LLM 推理、一次可选脚本运行和一次评估/修订过程。",
        }
        pdf_path, tex_path = write_solution_report(report_data)
        state.pdf_path = pdf_path
        state.tex_path = tex_path
        state.process_log.append("Artifact Writer: 生成 LaTeX 源码和 PDF 报告。")
        add_trace(
            state,
            "Artifact Writer",
            "生成报告产物",
            "\n".join([f"PDF: {pdf_path}", f"LaTeX: {tex_path}"]),
        )


class ProblemSolvingAgent:
    def __init__(self) -> None:
        self.modules = [
            SessionMemory(),
            Planner(),
            Retriever(),
            Executor(),
            ScriptRunner(),
            Evaluator(),
            LoopController(),
            Reflector(),
            ArtifactWriter(),
        ]

    def run(self, state: AgentState) -> AgentState:
        for module in self.modules:
            module.run(state)
        return state


def solve_problem(
    original_problem: str,
    api_key: str = "",
    base_url: str = "",
    model: str = "agnes-2.0-flash",
) -> Dict[str, Any]:
    original_problem = original_problem.strip()
    if not original_problem:
        raise ValueError("请输入一个问题。")

    state = AgentState(
        original_problem=original_problem,
        api_key=api_key,
        base_url=base_url,
        model=model,
    )
    state = ProblemSolvingAgent().run(state)
    return {
        "original_problem": state.original_problem,
        "structured_problem": state.structured_problem,
        "strategy": state.strategy,
        "tool_result": state.tool_result,
        "final_answer": state.final_answer,
        "process_log": "\n".join(state.process_log),
        "trace": state.trace,
        "pdf_path": state.pdf_path,
        "tex_path": state.tex_path,
    }


def iter_problem_steps(
    original_problem: str,
    api_key: str = "",
    base_url: str = "",
    model: str = "agnes-2.0-flash",
):
    original_problem = original_problem.strip()
    if not original_problem:
        raise ValueError("请输入一个问题。")

    state = AgentState(
        original_problem=original_problem,
        api_key=api_key,
        base_url=base_url,
        model=model,
    )
    agent = ProblemSolvingAgent()
    emitted = 0
    for module in agent.modules:
        module.run(state)
        for item in state.trace[emitted:]:
            yield item, state
        emitted = len(state.trace)