#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ LoongFlow HuggingFace Spaces Demo 展示 PEES (Plan-Execute-Execute-Summary) 进化式 Agent 工作流程 """ import gradio as gr import pandas as pd import time import random from typing import List, Dict, Any, Tuple # ============================================================================ # PEES 工作流程模拟 # ============================================================================ def simulate_planner(task: str) -> Dict[str, Any]: """模拟 Planner 阶段 - 制定战略计划""" time.sleep(0.3) strategies = [ "我将采用分治策略,把任务分解为多个子问题分别解决。", "首先进行需求分析,然后设计系统架构,最后逐步实现。", "使用迭代式开发,从最小可行产品开始,逐步添加功能。", "采用自顶向下的方法,先定义接口,再实现具体逻辑。", ] return { "role": "Planner", "thought": random.choice(strategies), "plan": f""" ## 任务分析 - 用户需求: {task} ## 战略规划 1. 理解任务本质和目标 2. 设计整体架构方案 3. 制定分步实施计划 4. 预留扩展和优化空间 """.strip(), "timestamp": time.strftime("%H:%M:%S") } def simulate_executor(task: str, plan: str) -> Dict[str, Any]: """模拟第一个 Execute 阶段 - 实现代码""" time.sleep(0.5) code_samples = { "todo": '''```python # Todo List App - 实现 class TodoList: def __init__(self): self.tasks = [] def add_task(self, title, priority="medium"): task = { "id": len(self.tasks) + 1, "title": title, "priority": priority, "done": False, "created_at": datetime.now() } self.tasks.append(task) return task def complete_task(self, task_id): for task in self.tasks: if task["id"] == task_id: task["done"] = True return True return False def get_pending(self): return [t for t in self.tasks if not t["done"]] ```''', "file": '''```python # File Processor - 实现 import os import shutil from pathlib import Path class FileProcessor: def __init__(self, input_dir, output_dir): self.input_dir = Path(input_dir) self.output_dir = Path(output_dir) def process_all(self): results = [] for filepath in self.input_dir.rglob("*"): if filepath.is_file(): dest = self.output_dir / filepath.relative_to(self.input_dir) dest.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(filepath, dest) results.append({"file": str(filepath), "status": "copied"}) return results ```''', "default": '''```python # Solution Implementation - 实现 class Solution: def __init__(self, task): self.task = task self.components = {} def analyze(self): """分析任务需求""" return {"requirements": "...", "constraints": "..."} def design(self): """设计解决方案""" return {"architecture": "...", "flow": "..."} def implement(self): """实现代码""" return {"code": "...", "tests": "..."} def run(self): return self.implement() ```''' } code = code_samples.get("default") for key, c in code_samples.items(): if key in task.lower(): code = c break return { "role": "Executor", "action": "编写并执行实现代码", "code": code, "result": "代码实现完成", "timestamp": time.strftime("%H:%M:%S") } def simulate_executor2(task: str, previous_result: str) -> Dict[str, Any]: """模拟第二个 Execute 阶段 - 验证测试""" time.sleep(0.4) test_samples = { "todo": '''```python # 测试用例 def test_todo_list(): todo = TodoList() # 测试添加任务 task = todo.add_task("完成报告", "high") assert task["title"] == "完成报告" assert task["priority"] == "high" # 测试完成任务 todo.complete_task(task["id"]) assert task["done"] == True # 测试获取待办 pending = todo.get_pending() assert len(pending) == 0 print("所有测试通过!") ```''', "file": '''```python # 测试用例 def test_file_processor(): processor = FileProcessor("input", "output") # 创建测试文件 os.makedirs("input", exist_ok=True) with open("input/test.txt", "w") as f: f.write("test") # 执行处理 results = processor.process_all() # 验证结果 assert os.path.exists("output/test.txt") assert len(results) == 1 print("所有测试通过!") ```''', "default": '''```python # 验证测试 def test_solution(): solution = Solution("task") # 测试各个组件 analysis = solution.analyze() assert analysis is not None design = solution.design() assert design is not None result = solution.run() assert result is not None print("所有测试通过!") ```''' } test_code = test_samples.get("default") for key, c in test_samples.items(): if key in task.lower(): test_code = c break return { "role": "Executor2", "action": "编写并运行测试用例", "code": test_code, "result": "测试执行完成", "timestamp": time.strftime("%H:%M:%S") } def simulate_summary(iteration: int, score: float, target: float) -> Dict[str, Any]: """模拟 Summary 阶段的反思过程""" time.sleep(0.3) reflections_positive = [ "本次迭代成功实现了核心功能,分数有明显提升。", "代码结构良好,解决方案更优雅。", "测试覆盖完整,边界情况处理得当。", "验证通过,性能达到预期。", ] reflections_negative = [ "本次迭代遇到一些问题,分数略有下降。", "实现方案有缺陷,需要重新调整。", "某些边界情况未处理好,导致扣分。", "测试未完全通过,需要修复。", ] improvements_positive = [ "继续保持当前良好的实现方式", "建议扩展更多功能", "可以尝试更多边界情况", ] improvements_negative = [ "需要修复实现的bug", "建议优化代码结构", "需要添加更多的错误处理", "考虑性能优化", ] # 改进分数模拟逻辑: # 整体上升趋势,最后一次迭代要超过目标 # 计算目标与当前的差距 gap = target - score if gap > 0.3: # 早期:快速上升 base_gain = random.uniform(0.18, 0.28) new_score = score + base_gain elif gap > 0.1: # 中期:稳步上升,带小幅波动 base_gain = gap * random.uniform(0.5, 0.7) # 每次前进一半到七成的差距 oscillation = random.uniform(-0.05, 0.05) # 小幅振荡 new_score = score + base_gain + oscillation else: # 后期:接近或超过目标 # 最后一定要超过目标 new_score = target + random.uniform(0.02, 0.08) # 限制范围 new_score = max(0.15, min(1.0, new_score)) if new_score >= score: reflection = random.choice(reflections_positive) improvement = random.choice(improvements_positive) else: reflection = random.choice(reflections_negative) improvement = random.choice(improvements_negative) return { "role": "Summary", "reflection": reflection, "improvement": improvement, "score": new_score, "timestamp": time.strftime("%H:%M:%S") } def run_pees_iteration(task: str, iteration: int, current_score: float, target: float) -> Tuple[List[Dict[str, Any]], float]: """运行一次完整的 PEES 迭代""" results = [] # Phase 1: Plan planner_result = simulate_planner(task) results.append({ "phase": "Plan", "phase_name": "计划", "content": planner_result["thought"], "detail": planner_result["plan"], "timestamp": planner_result["timestamp"] }) # Phase 2: Execute (实现) executor_result = simulate_executor(task, planner_result["plan"]) results.append({ "phase": "Execute", "phase_name": "执行", "content": executor_result["action"], "detail": f"{executor_result['code']}\n\n执行结果: {executor_result['result']}", "timestamp": executor_result["timestamp"] }) # Phase 3: Evaluate (验证) executor2_result = simulate_executor2(task, executor_result["result"]) results.append({ "phase": "Evaluate", "phase_name": "验证", "content": executor2_result["action"], "detail": f"{executor2_result['code']}\n\n验证结果: {executor2_result['result']}", "timestamp": executor2_result["timestamp"] }) # Phase 4: Summary summary_result = simulate_summary(iteration, current_score, target) results.append({ "phase": "Summary", "phase_name": "总结", "content": summary_result["reflection"], "detail": f"改进建议: {summary_result['improvement']}\n\n当前分数: {summary_result['score']:.2f}", "timestamp": summary_result["timestamp"] }) return results, summary_result["score"] # ============================================================================ # Gradio UI # ============================================================================ def create_demo(): """创建 Gradio 界面""" with gr.Blocks(title="LoongFlow PEES Demo", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # LoongFlow PEES Agent Demo **LoongFlow** 是一个进化式 Agent 开发框架,采用 **PEES (Plan-Execute-Evaluate-Summary)** 思考范式。 --- ### PEES 工作流程 ``` ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ Plan │ → │ Execute │ → │ Evaluate │ → │ Summary │ │ 计划 │ │ 执行 │ │ 验证 │ │ 总结 │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │ │ ◀──── 迭代改进 ────│ │ ┌─────────┐ │ 目标达成 │ └─────────┘ ``` - **Plan (P)**: 分析任务,制定战略计划 - **Execute (E1)**: 编写代码,实现功能 - **Evaluate (E2)**: 编写测试,验证功能 - **Summary (S)**: 反思结果,提取改进建议 """) with gr.Row(): with gr.Column(scale=2): task_input = gr.Textbox( label="输入任务描述", placeholder="例如: 帮我写一个待办事项应用 / 创建一个文件处理工具", lines=3 ) with gr.Row(): max_iterations = gr.Slider( minimum=1, maximum=10, value=5, step=1, label="最大迭代次数" ) target_score = gr.Slider( minimum=0.5, maximum=1.0, value=0.85, step=0.05, label="目标分数" ) run_btn = gr.Button("开始执行任务", variant="primary") with gr.Column(scale=1): status_output = gr.Textbox( label="执行状态", lines=5, interactive=False ) # 分数演进 - 用 HTML 进度条 score_display = gr.HTML(label="分数演进") # 分数历史 score_list = gr.JSON(label="分数历史", visible=False) gr.Markdown("### 迭代详情") # 使用 Tab 展示四个阶段 with gr.Tabs(): with gr.Tab("Plan 计划"): plan_output = gr.Markdown("*等待开始...*") with gr.Tab("Execute 执行"): execute1_output = gr.Markdown("*等待开始...*") with gr.Tab("Evaluate 验证"): execute2_output = gr.Markdown("*等待开始...*") with gr.Tab("Summary 总结"): summary_output = gr.Markdown("*等待开始...*") def run_task(task: str, max_iter: int, target: float): if not task or not task.strip(): yield "错误: 请输入任务描述", "", "", "", "", "" return chart_data = [] current_score = 0.0 empty_md = "*等待开始...*" empty_svg = '等待开始...' # 初始状态 yield "状态: 准备执行任务...", empty_svg, empty_md, empty_md, empty_md, empty_md for i in range(1, int(max_iter) + 1): # 执行完整迭代 results, current_score = run_pees_iteration(task, i, current_score, target) # 分别获取四个阶段的结果 plan_result = results[0] execute1_result = results[1] execute2_result = results[2] summary_result = results[3] # 格式化每个阶段的输出 plan_md = f"""### 迭代 {i} - Plan 计划 **时间**: {plan_result['timestamp']} {plan_result['content']}
查看计划详情 {plan_result['detail']}
""" exec1_md = f"""### 迭代 {i} - Execute 执行 **时间**: {execute1_result['timestamp']} {execute1_result['content']}
查看实现代码 {execute1_result['detail']}
""" exec2_md = f"""### 迭代 {i} - Evaluate 验证 **时间**: {execute2_result['timestamp']} {execute2_result['content']}
查看测试代码 {execute2_result['detail']}
""" summary_md = f"""### 迭代 {i} - Summary 总结 **时间**: {summary_result['timestamp']} {summary_result['content']}
查看改进建议 {summary_result['detail']}
""" # 更新数据 chart_data.append({"iteration": i, "score": round(current_score, 2)}) # 生成 HTML 折线图 - SVG 实现 if len(chart_data) == 1: # 只有一个点,画一个点 svg = f''' 分数: {chart_data[0]["score"]:.2f} ''' else: # 多个点,画折线 width = 400 height = 250 padding = 40 plot_width = width - padding * 2 plot_height = height - padding * 2 # 生成点和线的 SVG points_svg = "" lines_svg = "" for idx, item in enumerate(chart_data): x = padding + idx * (plot_width / (len(chart_data) - 1)) y = padding + plot_height - item["score"] * plot_height points_svg += f'' points_svg += f'{item["score"]:.2f}' if idx > 0: prev_x = padding + (idx - 1) * (plot_width / (len(chart_data) - 1)) prev_y = padding + plot_height - chart_data[idx-1]["score"] * plot_height lines_svg += f'' # 添加坐标轴 svg = f''' 1.0 0.5 0.0 迭代次数 {lines_svg} {points_svg} ''' # 每次迭代完成后更新 UI status = f"状态: 第 {i}/{int(max_iter)} 次迭代完成 (分数: {current_score:.2f})" yield status, svg, plan_md, exec1_md, exec2_md, summary_md # 检查是否达到目标 if current_score >= target: break time.sleep(0.3) final_status = f"状态: 任务完成\n最终分数: {current_score:.2f}\n总迭代次数: {len(chart_data)}" yield final_status, svg, plan_md, exec1_md, exec2_md, summary_md run_btn.click( fn=run_task, inputs=[task_input, max_iterations, target_score], outputs=[status_output, score_display, plan_output, execute1_output, execute2_output, summary_output] ) gr.Markdown(""" --- ### 关于 LoongFlow LoongFlow 是一个面向复杂任务的进化式 Agent 框架,特别适用于: - **数学推理**: 开放式数学问题求解 - **机器学习**: AutoML 和算法优化 - **代码生成**: 复杂编程任务 - **科学研究**: 实验设计和分析 了解更多: [GitHub](https://github.com/baidu-baige/LoongFlow) """) return demo if __name__ == "__main__": demo = create_demo() demo.launch(server_name="0.0.0.0", server_port=7860)