| | |
| | |
| | """ |
| | LoongFlow HuggingFace Spaces Demo |
| | 展示 PEES (Plan-Execute-Execute-Summary) 进化式 Agent 工作流程 |
| | """ |
| |
|
| | import gradio as gr |
| | import pandas as pd |
| | import time |
| | import random |
| | from typing import List, Dict, Any, Tuple |
| |
|
| | |
| | |
| | |
| |
|
| | def simulate_planner(task: str) -> Dict[str, Any]: |
| | """模拟 Planner 阶段 - 制定战略计划""" |
| | time.sleep(0.3) |
| | |
| | strategies = [ |
| | "我将采用分治策略,把任务分解为多个子问题分别解决。", |
| | "首先进行需求分析,然后设计系统架构,最后逐步实现。", |
| | "使用迭代式开发,从最小可行产品开始,逐步添加功能。", |
| | "采用自顶向下的方法,先定义接口,再实现具体逻辑。", |
| | ] |
| | |
| | return { |
| | "role": "Planner", |
| | "thought": random.choice(strategies), |
| | "plan": f""" |
| | ## 任务分析 |
| | - 用户需求: {task} |
| | |
| | ## 战略规划 |
| | 1. 理解任务本质和目标 |
| | 2. 设计整体架构方案 |
| | 3. 制定分步实施计划 |
| | 4. 预留扩展和优化空间 |
| | """.strip(), |
| | "timestamp": time.strftime("%H:%M:%S") |
| | } |
| |
|
| |
|
| | def simulate_executor(task: str, plan: str) -> Dict[str, Any]: |
| | """模拟第一个 Execute 阶段 - 实现代码""" |
| | time.sleep(0.5) |
| | |
| | code_samples = { |
| | "todo": '''```python |
| | # Todo List App - 实现 |
| | class TodoList: |
| | def __init__(self): |
| | self.tasks = [] |
| | |
| | def add_task(self, title, priority="medium"): |
| | task = { |
| | "id": len(self.tasks) + 1, |
| | "title": title, |
| | "priority": priority, |
| | "done": False, |
| | "created_at": datetime.now() |
| | } |
| | self.tasks.append(task) |
| | return task |
| | |
| | def complete_task(self, task_id): |
| | for task in self.tasks: |
| | if task["id"] == task_id: |
| | task["done"] = True |
| | return True |
| | return False |
| | |
| | def get_pending(self): |
| | return [t for t in self.tasks if not t["done"]] |
| | ```''', |
| | "file": '''```python |
| | # File Processor - 实现 |
| | import os |
| | import shutil |
| | from pathlib import Path |
| | |
| | class FileProcessor: |
| | def __init__(self, input_dir, output_dir): |
| | self.input_dir = Path(input_dir) |
| | self.output_dir = Path(output_dir) |
| | |
| | def process_all(self): |
| | results = [] |
| | for filepath in self.input_dir.rglob("*"): |
| | if filepath.is_file(): |
| | dest = self.output_dir / filepath.relative_to(self.input_dir) |
| | dest.parent.mkdir(parents=True, exist_ok=True) |
| | shutil.copy2(filepath, dest) |
| | results.append({"file": str(filepath), "status": "copied"}) |
| | return results |
| | ```''', |
| | "default": '''```python |
| | # Solution Implementation - 实现 |
| | class Solution: |
| | def __init__(self, task): |
| | self.task = task |
| | self.components = {} |
| | |
| | def analyze(self): |
| | """分析任务需求""" |
| | return {"requirements": "...", "constraints": "..."} |
| | |
| | def design(self): |
| | """设计解决方案""" |
| | return {"architecture": "...", "flow": "..."} |
| | |
| | def implement(self): |
| | """实现代码""" |
| | return {"code": "...", "tests": "..."} |
| | |
| | def run(self): |
| | return self.implement() |
| | ```''' |
| | } |
| | |
| | code = code_samples.get("default") |
| | for key, c in code_samples.items(): |
| | if key in task.lower(): |
| | code = c |
| | break |
| | |
| | return { |
| | "role": "Executor", |
| | "action": "编写并执行实现代码", |
| | "code": code, |
| | "result": "代码实现完成", |
| | "timestamp": time.strftime("%H:%M:%S") |
| | } |
| |
|
| |
|
| | def simulate_executor2(task: str, previous_result: str) -> Dict[str, Any]: |
| | """模拟第二个 Execute 阶段 - 验证测试""" |
| | time.sleep(0.4) |
| | |
| | test_samples = { |
| | "todo": '''```python |
| | # 测试用例 |
| | def test_todo_list(): |
| | todo = TodoList() |
| | |
| | # 测试添加任务 |
| | task = todo.add_task("完成报告", "high") |
| | assert task["title"] == "完成报告" |
| | assert task["priority"] == "high" |
| | |
| | # 测试完成任务 |
| | todo.complete_task(task["id"]) |
| | assert task["done"] == True |
| | |
| | # 测试获取待办 |
| | pending = todo.get_pending() |
| | assert len(pending) == 0 |
| | |
| | print("所有测试通过!") |
| | ```''', |
| | "file": '''```python |
| | # 测试用例 |
| | def test_file_processor(): |
| | processor = FileProcessor("input", "output") |
| | |
| | # 创建测试文件 |
| | os.makedirs("input", exist_ok=True) |
| | with open("input/test.txt", "w") as f: |
| | f.write("test") |
| | |
| | # 执行处理 |
| | results = processor.process_all() |
| | |
| | # 验证结果 |
| | assert os.path.exists("output/test.txt") |
| | assert len(results) == 1 |
| | |
| | print("所有测试通过!") |
| | ```''', |
| | "default": '''```python |
| | # 验证测试 |
| | def test_solution(): |
| | solution = Solution("task") |
| | |
| | # 测试各个组件 |
| | analysis = solution.analyze() |
| | assert analysis is not None |
| | |
| | design = solution.design() |
| | assert design is not None |
| | |
| | result = solution.run() |
| | assert result is not None |
| | |
| | print("所有测试通过!") |
| | ```''' |
| | } |
| | |
| | test_code = test_samples.get("default") |
| | for key, c in test_samples.items(): |
| | if key in task.lower(): |
| | test_code = c |
| | break |
| | |
| | return { |
| | "role": "Executor2", |
| | "action": "编写并运行测试用例", |
| | "code": test_code, |
| | "result": "测试执行完成", |
| | "timestamp": time.strftime("%H:%M:%S") |
| | } |
| |
|
| |
|
| | def simulate_summary(iteration: int, score: float, target: float) -> Dict[str, Any]: |
| | """模拟 Summary 阶段的反思过程""" |
| | time.sleep(0.3) |
| | |
| | reflections_positive = [ |
| | "本次迭代成功实现了核心功能,分数有明显提升。", |
| | "代码结构良好,解决方案更优雅。", |
| | "测试覆盖完整,边界情况处理得当。", |
| | "验证通过,性能达到预期。", |
| | ] |
| | |
| | reflections_negative = [ |
| | "本次迭代遇到一些问题,分数略有下降。", |
| | "实现方案有缺陷,需要重新调整。", |
| | "某些边界情况未处理好,导致扣分。", |
| | "测试未完全通过,需要修复。", |
| | ] |
| | |
| | improvements_positive = [ |
| | "继续保持当前良好的实现方式", |
| | "建议扩展更多功能", |
| | "可以尝试更多边界情况", |
| | ] |
| | |
| | improvements_negative = [ |
| | "需要修复实现的bug", |
| | "建议优化代码结构", |
| | "需要添加更多的错误处理", |
| | "考虑性能优化", |
| | ] |
| | |
| | |
| | |
| | |
| | |
| | gap = target - score |
| | |
| | if gap > 0.3: |
| | |
| | base_gain = random.uniform(0.18, 0.28) |
| | new_score = score + base_gain |
| | elif gap > 0.1: |
| | |
| | base_gain = gap * random.uniform(0.5, 0.7) |
| | oscillation = random.uniform(-0.05, 0.05) |
| | new_score = score + base_gain + oscillation |
| | else: |
| | |
| | |
| | new_score = target + random.uniform(0.02, 0.08) |
| | |
| | |
| | new_score = max(0.15, min(1.0, new_score)) |
| | |
| | if new_score >= score: |
| | reflection = random.choice(reflections_positive) |
| | improvement = random.choice(improvements_positive) |
| | else: |
| | reflection = random.choice(reflections_negative) |
| | improvement = random.choice(improvements_negative) |
| | |
| | return { |
| | "role": "Summary", |
| | "reflection": reflection, |
| | "improvement": improvement, |
| | "score": new_score, |
| | "timestamp": time.strftime("%H:%M:%S") |
| | } |
| |
|
| |
|
| | def run_pees_iteration(task: str, iteration: int, current_score: float, target: float) -> Tuple[List[Dict[str, Any]], float]: |
| | """运行一次完整的 PEES 迭代""" |
| | results = [] |
| | |
| | |
| | planner_result = simulate_planner(task) |
| | results.append({ |
| | "phase": "Plan", |
| | "phase_name": "计划", |
| | "content": planner_result["thought"], |
| | "detail": planner_result["plan"], |
| | "timestamp": planner_result["timestamp"] |
| | }) |
| | |
| | |
| | executor_result = simulate_executor(task, planner_result["plan"]) |
| | results.append({ |
| | "phase": "Execute", |
| | "phase_name": "执行", |
| | "content": executor_result["action"], |
| | "detail": f"{executor_result['code']}\n\n执行结果: {executor_result['result']}", |
| | "timestamp": executor_result["timestamp"] |
| | }) |
| | |
| | |
| | executor2_result = simulate_executor2(task, executor_result["result"]) |
| | results.append({ |
| | "phase": "Evaluate", |
| | "phase_name": "验证", |
| | "content": executor2_result["action"], |
| | "detail": f"{executor2_result['code']}\n\n验证结果: {executor2_result['result']}", |
| | "timestamp": executor2_result["timestamp"] |
| | }) |
| | |
| | |
| | summary_result = simulate_summary(iteration, current_score, target) |
| | results.append({ |
| | "phase": "Summary", |
| | "phase_name": "总结", |
| | "content": summary_result["reflection"], |
| | "detail": f"改进建议: {summary_result['improvement']}\n\n当前分数: {summary_result['score']:.2f}", |
| | "timestamp": summary_result["timestamp"] |
| | }) |
| | |
| | return results, summary_result["score"] |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def create_demo(): |
| | """创建 Gradio 界面""" |
| | |
| | with gr.Blocks(title="LoongFlow PEES Demo", theme=gr.themes.Soft()) as demo: |
| | gr.Markdown(""" |
| | # LoongFlow PEES Agent Demo |
| | |
| | **LoongFlow** 是一个进化式 Agent 开发框架,采用 **PEES (Plan-Execute-Evaluate-Summary)** 思考范式。 |
| | |
| | --- |
| | |
| | ### PEES 工作流程 |
| | |
| | ``` |
| | ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ |
| | │ Plan │ → │ Execute │ → │ Evaluate │ → │ Summary │ |
| | │ 计划 │ │ 执行 │ │ 验证 │ │ 总结 │ |
| | └─────────┘ └─────────┘ └─────────┘ └─────────┘ |
| | │ │ |
| | │ ◀──── 迭代改进 ────│ |
| | │ |
| | ┌─────────┐ |
| | │ 目标达成 │ |
| | └─────────┘ |
| | ``` |
| | |
| | - **Plan (P)**: 分析任务,制定战略计划 |
| | - **Execute (E1)**: 编写代码,实现功能 |
| | - **Evaluate (E2)**: 编写测试,验证功能 |
| | - **Summary (S)**: 反思结果,提取改进建议 |
| | """) |
| | |
| | with gr.Row(): |
| | with gr.Column(scale=2): |
| | task_input = gr.Textbox( |
| | label="输入任务描述", |
| | placeholder="例如: 帮我写一个待办事项应用 / 创建一个文件处理工具", |
| | lines=3 |
| | ) |
| | |
| | with gr.Row(): |
| | max_iterations = gr.Slider( |
| | minimum=1, maximum=10, value=5, step=1, |
| | label="最大迭代次数" |
| | ) |
| | target_score = gr.Slider( |
| | minimum=0.5, maximum=1.0, value=0.85, step=0.05, |
| | label="目标分数" |
| | ) |
| | |
| | run_btn = gr.Button("开始执行任务", variant="primary") |
| | |
| | with gr.Column(scale=1): |
| | status_output = gr.Textbox( |
| | label="执行状态", |
| | lines=5, |
| | interactive=False |
| | ) |
| | |
| | |
| | score_display = gr.HTML(label="分数演进") |
| | |
| | |
| | score_list = gr.JSON(label="分数历史", visible=False) |
| | |
| | gr.Markdown("### 迭代详情") |
| | |
| | |
| | with gr.Tabs(): |
| | with gr.Tab("Plan 计划"): |
| | plan_output = gr.Markdown("*等待开始...*") |
| | with gr.Tab("Execute 执行"): |
| | execute1_output = gr.Markdown("*等待开始...*") |
| | with gr.Tab("Evaluate 验证"): |
| | execute2_output = gr.Markdown("*等待开始...*") |
| | with gr.Tab("Summary 总结"): |
| | summary_output = gr.Markdown("*等待开始...*") |
| | |
| | def run_task(task: str, max_iter: int, target: float): |
| | if not task or not task.strip(): |
| | yield "错误: 请输入任务描述", "", "", "", "", "" |
| | return |
| | |
| | chart_data = [] |
| | current_score = 0.0 |
| | |
| | empty_md = "*等待开始...*" |
| | empty_svg = '<svg width="400" height="250"><text x="200" y="130" text-anchor="middle" fill="#999">等待开始...</text></svg>' |
| | |
| | yield "状态: 准备执行任务...", empty_svg, empty_md, empty_md, empty_md, empty_md |
| | |
| | for i in range(1, int(max_iter) + 1): |
| | |
| | results, current_score = run_pees_iteration(task, i, current_score, target) |
| | |
| | |
| | plan_result = results[0] |
| | execute1_result = results[1] |
| | execute2_result = results[2] |
| | summary_result = results[3] |
| | |
| | |
| | plan_md = f"""### 迭代 {i} - Plan 计划 |
| | **时间**: {plan_result['timestamp']} |
| | |
| | {plan_result['content']} |
| | |
| | <details> |
| | <summary>查看计划详情</summary> |
| | |
| | {plan_result['detail']} |
| | |
| | </details> |
| | """ |
| | |
| | exec1_md = f"""### 迭代 {i} - Execute 执行 |
| | **时间**: {execute1_result['timestamp']} |
| | |
| | {execute1_result['content']} |
| | |
| | <details> |
| | <summary>查看实现代码</summary> |
| | |
| | {execute1_result['detail']} |
| | |
| | </details> |
| | """ |
| | |
| | exec2_md = f"""### 迭代 {i} - Evaluate 验证 |
| | **时间**: {execute2_result['timestamp']} |
| | |
| | {execute2_result['content']} |
| | |
| | <details> |
| | <summary>查看测试代码</summary> |
| | |
| | {execute2_result['detail']} |
| | |
| | </details> |
| | """ |
| | |
| | summary_md = f"""### 迭代 {i} - Summary 总结 |
| | **时间**: {summary_result['timestamp']} |
| | |
| | {summary_result['content']} |
| | |
| | <details> |
| | <summary>查看改进建议</summary> |
| | |
| | {summary_result['detail']} |
| | |
| | </details> |
| | """ |
| | |
| | |
| | chart_data.append({"iteration": i, "score": round(current_score, 2)}) |
| | |
| | |
| | if len(chart_data) == 1: |
| | |
| | svg = f''' |
| | <svg width="400" height="250" style="border:1px solid #ccc; background:white;"> |
| | <text x="200" y="130" text-anchor="middle" fill="#666">分数: {chart_data[0]["score"]:.2f}</text> |
| | <circle cx="50" cy="{200 - chart_data[0]["score"]*180}" r="8" fill="#22c55e"/> |
| | </svg> |
| | ''' |
| | else: |
| | |
| | width = 400 |
| | height = 250 |
| | padding = 40 |
| | plot_width = width - padding * 2 |
| | plot_height = height - padding * 2 |
| | |
| | |
| | points_svg = "" |
| | lines_svg = "" |
| | for idx, item in enumerate(chart_data): |
| | x = padding + idx * (plot_width / (len(chart_data) - 1)) |
| | y = padding + plot_height - item["score"] * plot_height |
| | points_svg += f'<circle cx="{x}" cy="{y}" r="6" fill="#22c55e" stroke="white" stroke-width="2"/>' |
| | points_svg += f'<text x="{x}" y="{y-15}" text-anchor="middle" font-size="12" fill="#333">{item["score"]:.2f}</text>' |
| | if idx > 0: |
| | prev_x = padding + (idx - 1) * (plot_width / (len(chart_data) - 1)) |
| | prev_y = padding + plot_height - chart_data[idx-1]["score"] * plot_height |
| | lines_svg += f'<line x1="{prev_x}" y1="{prev_y}" x2="{x}" y2="{y}" stroke="#22c55e" stroke-width="3"/>' |
| | |
| | |
| | svg = f''' |
| | <svg width="{width}" height="{height}" style="border:1px solid #ccc; background:white; border-radius:8px;"> |
| | <!-- Y轴标签 --> |
| | <text x="15" y="50" font-size="12" fill="#666">1.0</text> |
| | <text x="15" y="{padding + plot_height/2}" font-size="12" fill="#666">0.5</text> |
| | <text x="15" y="{height-20}" font-size="12" fill="#666">0.0</text> |
| | <!-- X轴标签 --> |
| | <text x="{width/2}" y="{height-5}" font-size="12" fill="#666" text-anchor="middle">迭代次数</text> |
| | <!-- 折线 --> |
| | {lines_svg} |
| | {points_svg} |
| | </svg> |
| | ''' |
| | |
| | |
| | status = f"状态: 第 {i}/{int(max_iter)} 次迭代完成 (分数: {current_score:.2f})" |
| | yield status, svg, plan_md, exec1_md, exec2_md, summary_md |
| | |
| | |
| | if current_score >= target: |
| | break |
| | |
| | time.sleep(0.3) |
| | |
| | final_status = f"状态: 任务完成\n最终分数: {current_score:.2f}\n总迭代次数: {len(chart_data)}" |
| | yield final_status, svg, plan_md, exec1_md, exec2_md, summary_md |
| | |
| | run_btn.click( |
| | fn=run_task, |
| | inputs=[task_input, max_iterations, target_score], |
| | outputs=[status_output, score_display, plan_output, execute1_output, execute2_output, summary_output] |
| | ) |
| | |
| | gr.Markdown(""" |
| | --- |
| | |
| | ### 关于 LoongFlow |
| | |
| | LoongFlow 是一个面向复杂任务的进化式 Agent 框架,特别适用于: |
| | |
| | - **数学推理**: 开放式数学问题求解 |
| | - **机器学习**: AutoML 和算法优化 |
| | - **代码生成**: 复杂编程任务 |
| | - **科学研究**: 实验设计和分析 |
| | |
| | 了解更多: [GitHub](https://github.com/baidu-baige/LoongFlow) |
| | """) |
| | |
| | return demo |
| |
|
| |
|
| | if __name__ == "__main__": |
| | demo = create_demo() |
| | demo.launch(server_name="0.0.0.0", server_port=7860) |
| |
|