from typing import List, Dict, Tuple import os import json from copy import deepcopy import re import sys import time import xml.etree.ElementTree as ET # 假设 Pangu 模型在这里,如果位置不同请修改 try: from models.Pangu import Pangu except ImportError: print("Warning: Could not import Pangu model. Debugging might fail if Pangu is not defined.") from .Base import BaseStrategy from models.Base import BaseModel from results.Results import Results from datasets.Dataset import Dataset from datasets.APPSDataset import APPSDataset from datasets.XCodeDataset import XCodeDataset from datasets.HumanEvalDataset import HumanDataset from datasets.CodeContestDataset import CodeContestDataset from datasets.MBPPDataset import MBPPDataset class DebateCoder(BaseStrategy): """ Multi-Agent Debate-Based Planning Strategy 基于多智能体辩论的规划策略,通过三个不同角色的代理进行多轮辩论来生成高质量的代码规划。 """ def __init__( self, rounds: int = 3, early_stop_threshold: float = 95.0, t: int = 3, # Debug 轮数 *args, **kwargs ): super().__init__(*args, **kwargs) self.rounds = rounds self.early_stop_threshold = early_stop_threshold self.t = t self.log_dir = "./outputs/responses" # [Fix] 初始化 token 计数器,解决 AttributeError self.pr_tok = 0 self.com_tok = 0 os.makedirs(self.log_dir, exist_ok=True) def log_response(self, response: str, stage: str, item: dict): """记录响应到日志文件""" log_file = os.path.join(self.log_dir, f"DebateCoder_{self.model.__class__.__name__}_responses.log") try: with open(log_file, "a", encoding="utf-8") as f: from datetime import datetime f.write(f"\n---\n") f.write(f"# timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"# dataset: {self.data.__class__.__name__}\n") f.write(f"# id: {item.get('task_id', item.get('name', 'unknown'))}\n") f.write(f"# kind: {stage}\n") f.write(response) f.write(f"\n") except Exception as e: print(f"Failed to log response: {e}") def get_agent_role_prompt(self, role: str) -> str: """获取不同角色的系统提示""" role_prompts = { "UA": """You are a User Agent (UA) focusing on functionality completeness and usability. Your responsibility is to ensure: - The solution meets all user requirements - The code is easy to understand and use - Edge cases from a user perspective are handled - The interface is intuitive and clear""", "TA": """You are a Technical Agent (TA) focusing on technical feasibility and performance efficiency. Your responsibility is to ensure: - The solution is technically sound and implementable - The algorithm is efficient with optimal time/space complexity - Best practices and design patterns are followed - The code is maintainable and scalable""", "QA": """You are a QA Agent (QA) focusing on robustness with boundary conditions and exception handling. Your responsibility is to ensure: - All edge cases and boundary conditions are handled - Exception handling is comprehensive - Input validation is thorough - The solution is stable and reliable""" } return role_prompts.get(role, "") def generate_initial_plan(self, item: dict, role: str) -> str: """生成初始计划(第一轮)""" role_prompt = self.get_agent_role_prompt(role) problem_description = self.data.get_prompt(item) messages = [ { "role": "system", "content": role_prompt }, { "role": "user", "content": f"""Given the following programming problem, create a detailed step-by-step plan to solve it from your perspective as {role}. # Problem: {problem_description} # Your Task: Generate a clear, actionable plan with specific steps. Focus on aspects that align with your role's responsibilities. # Output Format: Provide a numbered list of steps to solve this problem.""" } ] print(f"\n{'='*60}") print(f"[Round 1] {role} - Initial Planning") print(f"{'='*60}") print("messages:", messages) response, pr_tok, com_tok = self.gpt_chat(messages) item['api_calls'] = item.get('api_calls', 0) + 1 # 累加 token self.pr_tok += pr_tok self.com_tok += com_tok self.log_response(response, f"Round-1-{role}-Plan", item) print(f"\n{role} Plan:\n{response[:300]}...") return response def generate_debate_plan(self, item: dict, role: str, round_num: int, own_prev_plan: str, other_plans: Dict[str, str]) -> str: """生成辩论后的计划(第2轮及之后)""" role_prompt = self.get_agent_role_prompt(role) problem_description = self.data.get_prompt(item) # 构建其他代理的计划描述 other_plans_text = "" for other_role, other_plan in other_plans.items(): other_plans_text += f"\n## {other_role}'s Previous Plan:\n{other_plan}\n" messages = [ { "role": "system", "content": role_prompt }, { "role": "user", "content": f"""Given the following programming problem, you previously created a plan. Now, review the plans from other agents and refine your plan. # Problem: {problem_description} # Your Previous Plan: {own_prev_plan} # Other Agents' Plans: {other_plans_text} # Your Task: 1. Analyze the strengths and weaknesses of other agents' plans 2. Compare them with your previous plan 3. Refine your plan by: - Incorporating good ideas from others - Addressing issues you identified - Maintaining focus on your role's responsibilities ({role}) # Output Format: Provide an improved, numbered list of steps to solve this problem.""" } ] print(f"\n{'='*60}") print(f"[Round {round_num}] {role} - Refining Plan") print(f"{'='*60}") response, pr_tok, com_tok = self.gpt_chat(messages) item['api_calls'] = item.get('api_calls', 0) + 1 # 累加 token self.pr_tok += pr_tok self.com_tok += com_tok self.log_response(response, f"Round-{round_num}-{role}-Plan", item) print(f"\n{role} Refined Plan:\n{response[:300]}...") return response def fuse_plans(self, item: dict, plans: Dict[str, str]) -> str: """融合三个代理的最终计划""" problem_description = self.data.get_prompt(item) plans_text = "" for role, plan in plans.items(): plans_text += f"\n## {role}'s Final Plan:\n{plan}\n" messages = [ { "role": "user", "content": f"""Given the following programming problem and three different planning perspectives, create a comprehensive final plan that integrates the strengths of all three approaches. # Problem: {problem_description} # Three Agents' Final Plans: {plans_text} # Your Task: Synthesize these three plans into ONE cohesive, comprehensive plan that: 1. Ensures functionality completeness and usability (from UA) 2. Maintains technical feasibility and efficiency (from TA) 3. Handles edge cases and exceptions robustly (from QA) # Output Format: Provide a clear, numbered list of steps that represents the best synthesis of all three perspectives.""" } ] print(f"\n{'='*60}") print(f"[Fusion] Combining Final Plans") print(f"{'='*60}") fused_plan, pr_tok, com_tok = self.gpt_chat(messages) item['api_calls'] = item.get('api_calls', 0) + 1 # 累加 token self.pr_tok += pr_tok self.com_tok += com_tok self.log_response(fused_plan, "Fused-Plan", item) print(f"\nFused Plan:\n{fused_plan[:300]}...") return fused_plan def generate_code_from_plan(self, item: dict, final_plan: str) -> str: """根据融合后的计划生成代码""" problem_description = self.data.get_prompt(item) messages = [ { "role": "user", "content": f"""Given the following programming problem and a detailed plan, implement the solution in {self.language}. # Problem: {problem_description} # Detailed Plan: {final_plan} # Your Task: Write complete, working code that implements this plan. Ensure: - The code follows the plan's steps - All edge cases are handled - The code is clean and well-structured - It passes all test cases # Output Format: Provide only the code implementation, without additional explanations.""" } ] print(f"\n{'='*60}") print(f"[Code Generation] Implementing Solution") print(f"{'='*60}") code, pr_tok, com_tok = self.gpt_chat(messages) item['api_calls'] = item.get('api_calls', 0) + 1 # 累加 token self.pr_tok += pr_tok self.com_tok += com_tok try: code = self.parse_code(code) except IndexError as e: print(f"parse_code raised IndexError: {e}. Will retry final code generation.", flush=True) max_code_retries = 2 parsed_success = False for cretry in range(1, max_code_retries + 1): retry_raw, pr_tok_r, com_tok_r = self.gpt_chat( messages ) item['api_calls'] = item.get('api_calls', 0) + 1 self.pr_tok += pr_tok_r self.com_tok += com_tok_r try: retry_parsed = self.parse_code(retry_raw) code = retry_parsed parsed_success = True self.log_response(retry_raw, f"final_code_retry_success-{cretry}", item) break except Exception as e2: print(f"Retry {cretry} parse_code failed: {e2}", flush=True) self.log_response(retry_raw, f"final_code_retry_failed-{cretry}", item) if not parsed_success: print("Final code generation: retries exhausted, using default fallback code.", flush=True) lang = (self.language or "").lower() if 'python' in lang: code = 'print("")' elif 'java' in lang: code = 'public class Main { public static void main(String[] args) { } }' elif 'c++' in lang or 'cpp' in lang: code = 'int main() { return 0; }' elif re.search(r"\bc\b", lang): code = 'int main() { return 0; }' elif 'js' in lang or 'node' in lang or 'javascript' in lang: code = 'console.log("")' else: code = '' # log that we used default code self.log_response(code, "final_code_fallback", item) self.log_response(code, "Generated-Code", item) print(f"\nGenerated Code:\n{code[:300]}...") return code def evaluate_plan_confidence(self, item: dict, plan: str) -> float: """评估计划的置信度""" prompt = f"""Given a programming problem and a plan to solve it, evaluate the quality and completeness of the plan. # Problem: {item.get('prompt', item.get('description', ''))} # Plan: {plan} Rate the plan's quality on a scale of 0-100, where: - 90-100: Excellent plan, covers all cases, clear implementation steps - 70-89: Good plan, minor gaps but workable - 50-69: Acceptable plan, has some issues - 0-49: Poor plan, major gaps or errors Output ONLY a number between 0-100, nothing else.""" messages = [{"role": "user", "content": prompt}] try: response, pr_tok, com_tok = self.gpt_chat(messages) # 累加 token self.pr_tok += pr_tok self.com_tok += com_tok # 提取数字 import re match = re.search(r'\b(\d+(?:\.\d+)?)\b', response) if match: confidence = float(match.group(1)) return min(100.0, max(0.0, confidence)) return 50.0 # 默认中等置信度 except Exception as e: if hasattr(self, 'verbose') and self.verbose: print(f"Failed to evaluate confidence: {e}") return 50.0 def parse_code(self, response: str) -> str: if "```" not in response: return response code_pattern = r'```((.|\n)*?)```' if "```Python" in response: code_pattern = r'```Python((.|\n)*?)```' if "```Python3" in response: code_pattern = r'```Python3((.|\n)*?)```' if "```python" in response: code_pattern = r'```python((.|\n)*?)```' if "```python3" in response: code_pattern = r'```python3((.|\n)*?)```' if "```C" in response: code_pattern = r'```C((.|\n)*?)```' if "```c" in response: code_pattern = r'```c((.|\n)*?)```' if "```C++" in response: code_pattern = r'```C\+\+((.|\n)*?)```' if "```c++" in response: code_pattern = r'```c\+\+((.|\n)*?)```' if "```Java" in response: code_pattern = r'```Java((.|\n)*?)```' if "```java" in response: code_pattern = r'```java((.|\n)*?)```' if "```Node" in response: code_pattern = r'```Node((.|\n)*?)```' if "```node" in response: code_pattern = r'```node((.|\n)*?)```' if "```Rust" in response: code_pattern = r'```Rust((.|\n)*?)```' if "```rust" in response: code_pattern = r'```rust((.|\n)*?)```' if "```PHP" in response: code_pattern = r'```PHP((.|\n)*?)```' if "```php" in response: code_pattern = r'```php((.|\n)*?)```' if "```Go" in response: code_pattern = r'```Go((.|\n)*?)```' if "```go" in response: code_pattern = r'```go((.|\n)*?)```' if "```Ruby" in response: code_pattern = r'```Ruby((.|\n)*?)```' if "```ruby" in response: code_pattern = r'```ruby((.|\n)*?)```' if "```C#" in response: code_pattern = r'```C#((.|\n)*?)```' if "```c#" in response: code_pattern = r'```c#((.|\n)*?)```' if "```csharp" in response: code_pattern = r'```csharp((.|\n)*?)```' code_blocks = re.findall(code_pattern, response, re.DOTALL) if type(code_blocks[-1]) == tuple or type(code_blocks[-1]) == list: code_str = "\n".join(code_blocks[-1]) elif type(code_blocks[-1]) == str: code_str = code_blocks[-1] else: code_str = response return code_str def Reviewer_pangu1b(self,problem_prompt: str, plan: str, code: str, test_log: str, task_id: str = "unknown") -> Tuple[str, int, int]: """ Reviewer 角色:分析代码失败原因并提供修复计划。 Args: problem_prompt (str): 题目描述。 code (str):生成的代码。 test_log (str): 测试失败的日志报告。 task_id (str): 当前任务的ID,用于日志打印 (对应原代码中的 i)。 Returns: Tuple[str, int, int]: 返回 (分析结果, prompt_token消耗, completion_token消耗) """ # 构造 Prompt reviewer_input = [ { "role": "user", "content": f"You are an expert programmer. The following code was generated to solve a problem but failed sample test cases.\n\n## Problem:\n{problem_prompt}\n\n## Plan:\n{plan}\n \n## Generated Code:\n```\n{code}\n```\n\n## Test Report:\n{test_log}\n\nPlease analyze why the code failed and provide a specific plan to fix it. Do not generate the full code, just the analysis and fix plan." } ] print(f"Input for Reviewer analysis: {task_id}") try: # 假设 Pangu 类已经在外部导入 # 如果 Pangu 模型加载开销很大,建议将 pangu_model 作为一个参数传入函数,而不是在内部实例化 pangu_model = Pangu() analysis, q_pr_tok, q_com_tok = pangu_model.prompt(reviewer_input) print(f"Reviewer Analysis: {analysis}", flush=True) return analysis, q_pr_tok, q_com_tok except NameError: print("Error: Pangu model class not found. Skipping detailed analysis.") return "Code failed sample tests. Please check logic and edge cases.", 0, 0 except Exception as e: print(f"Error during Reviewer analysis: {e}") return "Code failed sample tests. Please check logic and edge cases.", 0, 0 def debugging(self, plan: list, code: str, item: dict, algorithm_prompt: str) -> str: passed = False planning, _, _ = plan # plan_code_response = f"## Planning: {planning}\n## Code:\n```\n{code}\n```" if type(self.data) == APPSDataset or type(self.data) == CodeContestDataset or type(self.data) == XCodeDataset: std_input_prompt = "## Note: Strictly follow the input and output format. The input should be taken from Standard input and output should be given to standard output. If you are writing a function then after the function definition take input using `input()` function then call the function with specified parameters and finally print the output of the function. Do not add extra print statement otherwise it will failed the test cases." else: std_input_prompt = "" for i in range(1, self.t + 1): passed, test_log = self.data.evaluate_sample_io( item, code, self.language ) if passed: print(f"DEBUGGING: Test passed at round {i}") break problem_prompt = self.data.get_prompt(item) # 调用 Reviewer analysis, pr_cost, com_cost = self.Reviewer_pangu1b( problem_prompt=problem_prompt, plan = planning, code=code, test_log=test_log, task_id=i ) print(f"pr_cost: {pr_cost}, com_cost: {com_cost}") # 更新 Token 计数 self.pr_tok += pr_cost self.com_tok += com_cost print(f"Input for improving code generation: {i}") input_for_improving_code = [ { "role": "user", "content": f" You are an expert competitive programmer. Your task is to fix the provided {self.language} code based on the Expert Analysis. The original code failed sample test cases.\n\n### Problem Description: \n{self.data.get_prompt(item)}\n ### Original Code (Buggy): \n```\n{code}\n```\n### Expert Analysis & Fix Plan:\n{analysis}\n\nImprove your code to solve the problem correctly based on this analysis.\n### Requirement: 1. Read the Expert Analysis carefully. 2. {std_input_prompt} 3. Generate the corrected code.\n\n----------------\nImportant:\n{std_input_prompt}\n## Your response must contain the modified planning and then the {self.language} code inside ``` block to solve this problem." } ] print("\n\n________________________") print("Input for improving code generation: ") print(input_for_improving_code[0]['content'], flush=True) response, pr_tok_1, com_tok_1 = self.gpt_chat( input_for_improving_code ) item['api_calls'] += 1 self.pr_tok += pr_tok_1 self.com_tok += com_tok_1 raw_code = deepcopy(code) try: # 注意:这里我们解析的是新生成的 response,而不是旧的 code code = self.parse_code(response) except IndexError as e: print(f"parse_code raised IndexError: {e}. Will retry final code generation.", flush=True) max_code_retries = 2 parsed_success = False for cretry in range(1, max_code_retries + 1): retry_raw, pr_tok_r, com_tok_r = self.gpt_chat( input_for_improving_code ) item['api_calls'] = item.get('api_calls', 0) + 1 self.pr_tok += pr_tok_r self.com_tok += com_tok_r try: retry_parsed = self.parse_code(retry_raw) code = retry_parsed parsed_success = True self.log_response(retry_raw, f"final_code_retry_success-{cretry}", item) break except Exception as e2: print(f"Retry {cretry} parse_code failed: {e2}", flush=True) self.log_response(retry_raw, f"final_code_retry_failed-{cretry}", item) if not parsed_success: print("Final code generation: retries exhausted, using raw code.", flush=True) code = raw_code self.log_response(code, "final_code_fallback", item) return code def run_single_pass(self, item: dict): """执行单个问题的多智能体辩论流程""" self.pr_tok = 0 self.com_tok = 0 print("\n" + "="*80) print(f"Processing: {item.get('task_id', item.get('name', 'unknown'))}") print("="*80) agents = ["UA", "TA", "QA"] plans_history = {agent: [] for agent in agents} actual_rounds = 1 # Round 1: 生成初始计划 print(f"\n{'#'*80}") print(f"# ROUND 1: Initial Planning") print(f"{'#'*80}") current_plans = {} for agent in agents: plan = self.generate_initial_plan(item, agent) plans_history[agent].append(plan) current_plans[agent] = plan # 评估第一轮 if self.rounds > 1: confidences = {} print(f"\n{'='*60}") print("Evaluating Round 1 Plan Quality...") print(f"{'='*60}") for agent in agents: conf = self.evaluate_plan_confidence(item, current_plans[agent]) confidences[agent] = conf print(f"{agent} Plan Confidence: {conf:.1f}%") avg_confidence = sum(confidences.values()) / len(confidences) print(f"Average Confidence: {avg_confidence:.1f}%") if avg_confidence >= self.early_stop_threshold: print(f"\n✓ High confidence achieved! Skipping remaining rounds.") item['actual_rounds'] = actual_rounds item['early_stopped'] = True item['final_confidence'] = avg_confidence final_plan = self.fuse_plans(item, current_plans) code = self.generate_code_from_plan(item, final_plan) # Debugging (Early Stop) print(f"\n{'#'*80}") print(f"# DEBUGGING (Early Stop)") print(f"{'#'*80}") code = self.debugging([final_plan, None, None], code, item, "Please strictly follow the plan.") return code, self.pr_tok, self.com_tok # Round 2-R for round_num in range(2, self.rounds + 1): actual_rounds = round_num print(f"\n{'#'*80}") print(f"# ROUND {round_num}: Debate and Refinement") print(f"{'#'*80}") new_plans = {} for agent in agents: other_plans = {a: current_plans[a] for a in agents if a != agent} refined_plan = self.generate_debate_plan( item, agent, round_num, current_plans[agent], other_plans ) plans_history[agent].append(refined_plan) new_plans[agent] = refined_plan current_plans = new_plans if round_num < self.rounds: confidences = {} print(f"\n{'='*60}") print(f"Evaluating Round {round_num} Plan Quality...") print(f"{'='*60}") for agent in agents: conf = self.evaluate_plan_confidence(item, current_plans[agent]) confidences[agent] = conf print(f"{agent} Plan Confidence: {conf:.1f}%") avg_confidence = sum(confidences.values()) / len(confidences) if avg_confidence >= self.early_stop_threshold: print(f"\n✓ High confidence achieved! Skipping remaining rounds.") item['actual_rounds'] = actual_rounds item['early_stopped'] = True item['final_confidence'] = avg_confidence break if 'actual_rounds' not in item: item['actual_rounds'] = actual_rounds item['early_stopped'] = False # Fusion print(f"\n{'#'*80}") print(f"# FUSION: Combining All Perspectives") print(f"{'#'*80}") final_plan = self.fuse_plans(item, current_plans) # Code Generation print(f"\n{'#'*80}") print(f"# CODE GENERATION") print(f"{'#'*80}") code = self.generate_code_from_plan(item, final_plan) # Debugging print(f"\n{'#'*80}") print(f"# DEBUGGING") print(f"{'#'*80}") code = self.debugging([final_plan, None, None], code, item, "Please strictly follow the plan.") print(f"\n{'='*80}") print(f"Completed: {item.get('task_id', item.get('name', 'unknown'))}") # [Fix] 返回累积的 token 数 return code, self.pr_tok, self.com_tok