| | from typing import List, Dict, Tuple |
| | import os |
| | import json |
| | from copy import deepcopy |
| | import re |
| | import sys |
| | import time |
| |
|
| | import xml.etree.ElementTree as ET |
| |
|
| |
|
| |
|
| | |
| | try: |
| | from models.Pangu import Pangu |
| | except ImportError: |
| | print("Warning: Could not import Pangu model. Debugging might fail if Pangu is not defined.") |
| |
|
| | from .Base import BaseStrategy |
| | from models.Base import BaseModel |
| | from results.Results import Results |
| | from datasets.Dataset import Dataset |
| | from datasets.APPSDataset import APPSDataset |
| | from datasets.XCodeDataset import XCodeDataset |
| | from datasets.HumanEvalDataset import HumanDataset |
| | from datasets.CodeContestDataset import CodeContestDataset |
| | from datasets.MBPPDataset import MBPPDataset |
| |
|
| |
|
| | class DebateCoder(BaseStrategy): |
| | """ |
| | Multi-Agent Debate-Based Planning Strategy |
| | |
| | 基于多智能体辩论的规划策略,通过三个不同角色的代理进行多轮辩论来生成高质量的代码规划。 |
| | """ |
| | |
| | def __init__( |
| | self, |
| | rounds: int = 3, |
| | early_stop_threshold: float = 95.0, |
| | t: int = 3, |
| | *args, |
| | **kwargs |
| | ): |
| | super().__init__(*args, **kwargs) |
| | self.rounds = rounds |
| | self.early_stop_threshold = early_stop_threshold |
| | self.t = t |
| | self.log_dir = "./outputs/responses" |
| | |
| | |
| | self.pr_tok = 0 |
| | self.com_tok = 0 |
| | |
| | os.makedirs(self.log_dir, exist_ok=True) |
| | |
| | def log_response(self, response: str, stage: str, item: dict): |
| | """记录响应到日志文件""" |
| | log_file = os.path.join(self.log_dir, f"DebateCoder_{self.model.__class__.__name__}_responses.log") |
| | |
| | try: |
| | with open(log_file, "a", encoding="utf-8") as f: |
| | from datetime import datetime |
| | f.write(f"\n---\n") |
| | f.write(f"# timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") |
| | f.write(f"# dataset: {self.data.__class__.__name__}\n") |
| | f.write(f"# id: {item.get('task_id', item.get('name', 'unknown'))}\n") |
| | f.write(f"# kind: {stage}\n") |
| | f.write(response) |
| | f.write(f"\n") |
| | except Exception as e: |
| | print(f"Failed to log response: {e}") |
| | |
| | def get_agent_role_prompt(self, role: str) -> str: |
| | """获取不同角色的系统提示""" |
| | role_prompts = { |
| | "UA": """You are a User Agent (UA) focusing on functionality completeness and usability. |
| | Your responsibility is to ensure: |
| | - The solution meets all user requirements |
| | - The code is easy to understand and use |
| | - Edge cases from a user perspective are handled |
| | - The interface is intuitive and clear""", |
| | |
| | "TA": """You are a Technical Agent (TA) focusing on technical feasibility and performance efficiency. |
| | Your responsibility is to ensure: |
| | - The solution is technically sound and implementable |
| | - The algorithm is efficient with optimal time/space complexity |
| | - Best practices and design patterns are followed |
| | - The code is maintainable and scalable""", |
| | |
| | "QA": """You are a QA Agent (QA) focusing on robustness with boundary conditions and exception handling. |
| | Your responsibility is to ensure: |
| | - All edge cases and boundary conditions are handled |
| | - Exception handling is comprehensive |
| | - Input validation is thorough |
| | - The solution is stable and reliable""" |
| | } |
| | return role_prompts.get(role, "") |
| | |
| | def generate_initial_plan(self, item: dict, role: str) -> str: |
| | """生成初始计划(第一轮)""" |
| | role_prompt = self.get_agent_role_prompt(role) |
| | problem_description = self.data.get_prompt(item) |
| | messages = [ |
| | { |
| | "role": "system", |
| | "content": role_prompt |
| | }, |
| | { |
| | "role": "user", |
| | "content": f"""Given the following programming problem, create a detailed step-by-step plan to solve it from your perspective as {role}. |
| | |
| | # Problem: |
| | {problem_description} |
| | |
| | # Your Task: |
| | Generate a clear, actionable plan with specific steps. Focus on aspects that align with your role's responsibilities. |
| | |
| | # Output Format: |
| | Provide a numbered list of steps to solve this problem.""" |
| | } |
| | ] |
| | |
| | print(f"\n{'='*60}") |
| | print(f"[Round 1] {role} - Initial Planning") |
| | print(f"{'='*60}") |
| | print("messages:", messages) |
| | response, pr_tok, com_tok = self.gpt_chat(messages) |
| | item['api_calls'] = item.get('api_calls', 0) + 1 |
| | |
| | |
| | self.pr_tok += pr_tok |
| | self.com_tok += com_tok |
| | |
| | self.log_response(response, f"Round-1-{role}-Plan", item) |
| | |
| | print(f"\n{role} Plan:\n{response[:300]}...") |
| | |
| | return response |
| | |
| | def generate_debate_plan(self, item: dict, role: str, round_num: int, |
| | own_prev_plan: str, other_plans: Dict[str, str]) -> str: |
| | """生成辩论后的计划(第2轮及之后)""" |
| | role_prompt = self.get_agent_role_prompt(role) |
| | problem_description = self.data.get_prompt(item) |
| | |
| | |
| | other_plans_text = "" |
| | for other_role, other_plan in other_plans.items(): |
| | other_plans_text += f"\n## {other_role}'s Previous Plan:\n{other_plan}\n" |
| | |
| | messages = [ |
| | { |
| | "role": "system", |
| | "content": role_prompt |
| | }, |
| | { |
| | "role": "user", |
| | "content": f"""Given the following programming problem, you previously created a plan. Now, review the plans from other agents and refine your plan. |
| | |
| | # Problem: |
| | {problem_description} |
| | |
| | # Your Previous Plan: |
| | {own_prev_plan} |
| | |
| | # Other Agents' Plans: |
| | {other_plans_text} |
| | |
| | # Your Task: |
| | 1. Analyze the strengths and weaknesses of other agents' plans |
| | 2. Compare them with your previous plan |
| | 3. Refine your plan by: |
| | - Incorporating good ideas from others |
| | - Addressing issues you identified |
| | - Maintaining focus on your role's responsibilities ({role}) |
| | |
| | # Output Format: |
| | Provide an improved, numbered list of steps to solve this problem.""" |
| | } |
| | ] |
| | |
| | print(f"\n{'='*60}") |
| | print(f"[Round {round_num}] {role} - Refining Plan") |
| | print(f"{'='*60}") |
| | |
| | response, pr_tok, com_tok = self.gpt_chat(messages) |
| | item['api_calls'] = item.get('api_calls', 0) + 1 |
| | |
| | |
| | self.pr_tok += pr_tok |
| | self.com_tok += com_tok |
| | |
| | self.log_response(response, f"Round-{round_num}-{role}-Plan", item) |
| | |
| | print(f"\n{role} Refined Plan:\n{response[:300]}...") |
| | |
| | return response |
| | |
| | def fuse_plans(self, item: dict, plans: Dict[str, str]) -> str: |
| | """融合三个代理的最终计划""" |
| | problem_description = self.data.get_prompt(item) |
| | |
| | plans_text = "" |
| | for role, plan in plans.items(): |
| | plans_text += f"\n## {role}'s Final Plan:\n{plan}\n" |
| | |
| | messages = [ |
| | { |
| | "role": "user", |
| | "content": f"""Given the following programming problem and three different planning perspectives, create a comprehensive final plan that integrates the strengths of all three approaches. |
| | |
| | # Problem: |
| | {problem_description} |
| | |
| | # Three Agents' Final Plans: |
| | {plans_text} |
| | |
| | # Your Task: |
| | Synthesize these three plans into ONE cohesive, comprehensive plan that: |
| | 1. Ensures functionality completeness and usability (from UA) |
| | 2. Maintains technical feasibility and efficiency (from TA) |
| | 3. Handles edge cases and exceptions robustly (from QA) |
| | |
| | # Output Format: |
| | Provide a clear, numbered list of steps that represents the best synthesis of all three perspectives.""" |
| | } |
| | ] |
| | |
| | print(f"\n{'='*60}") |
| | print(f"[Fusion] Combining Final Plans") |
| | print(f"{'='*60}") |
| | |
| | fused_plan, pr_tok, com_tok = self.gpt_chat(messages) |
| | item['api_calls'] = item.get('api_calls', 0) + 1 |
| | |
| | |
| | self.pr_tok += pr_tok |
| | self.com_tok += com_tok |
| | |
| | self.log_response(fused_plan, "Fused-Plan", item) |
| | |
| | print(f"\nFused Plan:\n{fused_plan[:300]}...") |
| | |
| | return fused_plan |
| | |
| | def generate_code_from_plan(self, item: dict, final_plan: str) -> str: |
| | """根据融合后的计划生成代码""" |
| | problem_description = self.data.get_prompt(item) |
| | |
| | messages = [ |
| | { |
| | "role": "user", |
| | "content": f"""Given the following programming problem and a detailed plan, implement the solution in {self.language}. |
| | |
| | # Problem: |
| | {problem_description} |
| | |
| | # Detailed Plan: |
| | {final_plan} |
| | |
| | # Your Task: |
| | Write complete, working code that implements this plan. Ensure: |
| | - The code follows the plan's steps |
| | - All edge cases are handled |
| | - The code is clean and well-structured |
| | - It passes all test cases |
| | |
| | # Output Format: |
| | Provide only the code implementation, without additional explanations.""" |
| | } |
| | ] |
| | |
| | print(f"\n{'='*60}") |
| | print(f"[Code Generation] Implementing Solution") |
| | print(f"{'='*60}") |
| | |
| | code, pr_tok, com_tok = self.gpt_chat(messages) |
| | item['api_calls'] = item.get('api_calls', 0) + 1 |
| | |
| | |
| | self.pr_tok += pr_tok |
| | self.com_tok += com_tok |
| | |
| | try: |
| | code = self.parse_code(code) |
| | except IndexError as e: |
| | print(f"parse_code raised IndexError: {e}. Will retry final code generation.", flush=True) |
| | max_code_retries = 2 |
| | parsed_success = False |
| | for cretry in range(1, max_code_retries + 1): |
| | retry_raw, pr_tok_r, com_tok_r = self.gpt_chat( |
| | messages |
| | ) |
| | item['api_calls'] = item.get('api_calls', 0) + 1 |
| | self.pr_tok += pr_tok_r |
| | self.com_tok += com_tok_r |
| |
|
| | try: |
| | retry_parsed = self.parse_code(retry_raw) |
| | code = retry_parsed |
| | parsed_success = True |
| | self.log_response(retry_raw, f"final_code_retry_success-{cretry}", item) |
| | break |
| | except Exception as e2: |
| | print(f"Retry {cretry} parse_code failed: {e2}", flush=True) |
| | self.log_response(retry_raw, f"final_code_retry_failed-{cretry}", item) |
| |
|
| | if not parsed_success: |
| | print("Final code generation: retries exhausted, using default fallback code.", flush=True) |
| | lang = (self.language or "").lower() |
| | if 'python' in lang: |
| | code = 'print("")' |
| | elif 'java' in lang: |
| | code = 'public class Main { public static void main(String[] args) { } }' |
| | elif 'c++' in lang or 'cpp' in lang: |
| | code = 'int main() { return 0; }' |
| | elif re.search(r"\bc\b", lang): |
| | code = 'int main() { return 0; }' |
| | elif 'js' in lang or 'node' in lang or 'javascript' in lang: |
| | code = 'console.log("")' |
| | else: |
| | code = '' |
| | |
| | self.log_response(code, "final_code_fallback", item) |
| |
|
| | self.log_response(code, "Generated-Code", item) |
| | |
| | print(f"\nGenerated Code:\n{code[:300]}...") |
| | |
| | return code |
| | |
| | def evaluate_plan_confidence(self, item: dict, plan: str) -> float: |
| | """评估计划的置信度""" |
| | prompt = f"""Given a programming problem and a plan to solve it, evaluate the quality and completeness of the plan. |
| | |
| | # Problem: |
| | {item.get('prompt', item.get('description', ''))} |
| | |
| | # Plan: |
| | {plan} |
| | |
| | Rate the plan's quality on a scale of 0-100, where: |
| | - 90-100: Excellent plan, covers all cases, clear implementation steps |
| | - 70-89: Good plan, minor gaps but workable |
| | - 50-69: Acceptable plan, has some issues |
| | - 0-49: Poor plan, major gaps or errors |
| | |
| | Output ONLY a number between 0-100, nothing else.""" |
| |
|
| | messages = [{"role": "user", "content": prompt}] |
| | |
| | try: |
| | response, pr_tok, com_tok = self.gpt_chat(messages) |
| | |
| | |
| | self.pr_tok += pr_tok |
| | self.com_tok += com_tok |
| |
|
| | |
| | import re |
| | match = re.search(r'\b(\d+(?:\.\d+)?)\b', response) |
| | if match: |
| | confidence = float(match.group(1)) |
| | return min(100.0, max(0.0, confidence)) |
| | return 50.0 |
| | except Exception as e: |
| | if hasattr(self, 'verbose') and self.verbose: |
| | print(f"Failed to evaluate confidence: {e}") |
| | return 50.0 |
| |
|
| | def parse_code(self, response: str) -> str: |
| | if "```" not in response: |
| | return response |
| |
|
| | code_pattern = r'```((.|\n)*?)```' |
| | if "```Python" in response: |
| | code_pattern = r'```Python((.|\n)*?)```' |
| | if "```Python3" in response: |
| | code_pattern = r'```Python3((.|\n)*?)```' |
| | if "```python" in response: |
| | code_pattern = r'```python((.|\n)*?)```' |
| | if "```python3" in response: |
| | code_pattern = r'```python3((.|\n)*?)```' |
| | if "```C" in response: |
| | code_pattern = r'```C((.|\n)*?)```' |
| | if "```c" in response: |
| | code_pattern = r'```c((.|\n)*?)```' |
| | if "```C++" in response: |
| | code_pattern = r'```C\+\+((.|\n)*?)```' |
| | if "```c++" in response: |
| | code_pattern = r'```c\+\+((.|\n)*?)```' |
| | if "```Java" in response: |
| | code_pattern = r'```Java((.|\n)*?)```' |
| | if "```java" in response: |
| | code_pattern = r'```java((.|\n)*?)```' |
| | if "```Node" in response: |
| | code_pattern = r'```Node((.|\n)*?)```' |
| | if "```node" in response: |
| | code_pattern = r'```node((.|\n)*?)```' |
| | if "```Rust" in response: |
| | code_pattern = r'```Rust((.|\n)*?)```' |
| | if "```rust" in response: |
| | code_pattern = r'```rust((.|\n)*?)```' |
| | if "```PHP" in response: |
| | code_pattern = r'```PHP((.|\n)*?)```' |
| | if "```php" in response: |
| | code_pattern = r'```php((.|\n)*?)```' |
| | if "```Go" in response: |
| | code_pattern = r'```Go((.|\n)*?)```' |
| | if "```go" in response: |
| | code_pattern = r'```go((.|\n)*?)```' |
| | if "```Ruby" in response: |
| | code_pattern = r'```Ruby((.|\n)*?)```' |
| | if "```ruby" in response: |
| | code_pattern = r'```ruby((.|\n)*?)```' |
| | if "```C#" in response: |
| | code_pattern = r'```C#((.|\n)*?)```' |
| | if "```c#" in response: |
| | code_pattern = r'```c#((.|\n)*?)```' |
| | if "```csharp" in response: |
| | code_pattern = r'```csharp((.|\n)*?)```' |
| |
|
| | code_blocks = re.findall(code_pattern, response, re.DOTALL) |
| |
|
| | if type(code_blocks[-1]) == tuple or type(code_blocks[-1]) == list: |
| | code_str = "\n".join(code_blocks[-1]) |
| | elif type(code_blocks[-1]) == str: |
| | code_str = code_blocks[-1] |
| | else: |
| | code_str = response |
| |
|
| | return code_str |
| |
|
| |
|
| | def Reviewer_pangu1b(self,problem_prompt: str, plan: str, code: str, test_log: str, task_id: str = "unknown") -> Tuple[str, int, int]: |
| | """ |
| | Reviewer 角色:分析代码失败原因并提供修复计划。 |
| | |
| | Args: |
| | problem_prompt (str): 题目描述。 |
| | code (str):生成的代码。 |
| | test_log (str): 测试失败的日志报告。 |
| | task_id (str): 当前任务的ID,用于日志打印 (对应原代码中的 i)。 |
| | |
| | Returns: |
| | Tuple[str, int, int]: 返回 (分析结果, prompt_token消耗, completion_token消耗) |
| | """ |
| | |
| | |
| | reviewer_input = [ |
| | { |
| | "role": "user", |
| | "content": f"You are an expert programmer. The following code was generated to solve a problem but failed sample test cases.\n\n## Problem:\n{problem_prompt}\n\n## Plan:\n{plan}\n \n## Generated Code:\n```\n{code}\n```\n\n## Test Report:\n{test_log}\n\nPlease analyze why the code failed and provide a specific plan to fix it. Do not generate the full code, just the analysis and fix plan." |
| | } |
| | ] |
| | |
| | print(f"Input for Reviewer analysis: {task_id}") |
| |
|
| | try: |
| | |
| | |
| | pangu_model = Pangu() |
| | analysis, q_pr_tok, q_com_tok = pangu_model.prompt(reviewer_input) |
| | |
| | print(f"Reviewer Analysis: {analysis}", flush=True) |
| | return analysis, q_pr_tok, q_com_tok |
| |
|
| | except NameError: |
| | print("Error: Pangu model class not found. Skipping detailed analysis.") |
| | return "Code failed sample tests. Please check logic and edge cases.", 0, 0 |
| | |
| | except Exception as e: |
| | print(f"Error during Reviewer analysis: {e}") |
| | return "Code failed sample tests. Please check logic and edge cases.", 0, 0 |
| |
|
| |
|
| | def debugging(self, plan: list, code: str, item: dict, algorithm_prompt: str) -> str: |
| | passed = False |
| | planning, _, _ = plan |
| | |
| | |
| |
|
| | if type(self.data) == APPSDataset or type(self.data) == CodeContestDataset or type(self.data) == XCodeDataset: |
| | std_input_prompt = "## Note: Strictly follow the input and output format. The input should be taken from Standard input and output should be given to standard output. If you are writing a function then after the function definition take input using `input()` function then call the function with specified parameters and finally print the output of the function. Do not add extra print statement otherwise it will failed the test cases." |
| | else: |
| | std_input_prompt = "" |
| | |
| | for i in range(1, self.t + 1): |
| | passed, test_log = self.data.evaluate_sample_io( |
| | item, |
| | code, |
| | self.language |
| | ) |
| |
|
| | if passed: |
| | print(f"DEBUGGING: Test passed at round {i}") |
| | break |
| | |
| | problem_prompt = self.data.get_prompt(item) |
| |
|
| | |
| | analysis, pr_cost, com_cost = self.Reviewer_pangu1b( |
| | problem_prompt=problem_prompt, |
| | plan = planning, |
| | code=code, |
| | test_log=test_log, |
| | task_id=i |
| | ) |
| | print(f"pr_cost: {pr_cost}, com_cost: {com_cost}") |
| | |
| | self.pr_tok += pr_cost |
| | self.com_tok += com_cost |
| |
|
| | print(f"Input for improving code generation: {i}") |
| | input_for_improving_code = [ |
| | { |
| | "role": "user", |
| | "content": f" You are an expert competitive programmer. Your task is to fix the provided {self.language} code based on the Expert Analysis. The original code failed sample test cases.\n\n### Problem Description: \n{self.data.get_prompt(item)}\n ### Original Code (Buggy): \n```\n{code}\n```\n### Expert Analysis & Fix Plan:\n{analysis}\n\nImprove your code to solve the problem correctly based on this analysis.\n### Requirement: 1. Read the Expert Analysis carefully. 2. {std_input_prompt} 3. Generate the corrected code.\n\n----------------\nImportant:\n{std_input_prompt}\n## Your response must contain the modified planning and then the {self.language} code inside ``` block to solve this problem." } |
| | ] |
| |
|
| | print("\n\n________________________") |
| | print("Input for improving code generation: ") |
| | print(input_for_improving_code[0]['content'], flush=True) |
| |
|
| | response, pr_tok_1, com_tok_1 = self.gpt_chat( |
| | input_for_improving_code |
| | ) |
| | item['api_calls'] += 1 |
| |
|
| | self.pr_tok += pr_tok_1 |
| | self.com_tok += com_tok_1 |
| |
|
| | raw_code = deepcopy(code) |
| | try: |
| | |
| | code = self.parse_code(response) |
| | except IndexError as e: |
| | print(f"parse_code raised IndexError: {e}. Will retry final code generation.", flush=True) |
| | max_code_retries = 2 |
| | parsed_success = False |
| | for cretry in range(1, max_code_retries + 1): |
| | retry_raw, pr_tok_r, com_tok_r = self.gpt_chat( |
| | input_for_improving_code |
| | ) |
| | item['api_calls'] = item.get('api_calls', 0) + 1 |
| | self.pr_tok += pr_tok_r |
| | self.com_tok += com_tok_r |
| |
|
| | try: |
| | retry_parsed = self.parse_code(retry_raw) |
| | code = retry_parsed |
| | parsed_success = True |
| | self.log_response(retry_raw, f"final_code_retry_success-{cretry}", item) |
| | break |
| | except Exception as e2: |
| | print(f"Retry {cretry} parse_code failed: {e2}", flush=True) |
| | self.log_response(retry_raw, f"final_code_retry_failed-{cretry}", item) |
| |
|
| | if not parsed_success: |
| | print("Final code generation: retries exhausted, using raw code.", flush=True) |
| | code = raw_code |
| | self.log_response(code, "final_code_fallback", item) |
| | |
| | return code |
| |
|
| | def run_single_pass(self, item: dict): |
| | """执行单个问题的多智能体辩论流程""" |
| | self.pr_tok = 0 |
| | self.com_tok = 0 |
| | |
| | print("\n" + "="*80) |
| | print(f"Processing: {item.get('task_id', item.get('name', 'unknown'))}") |
| | print("="*80) |
| | |
| | agents = ["UA", "TA", "QA"] |
| | |
| | plans_history = {agent: [] for agent in agents} |
| | actual_rounds = 1 |
| | |
| | |
| | print(f"\n{'#'*80}") |
| | print(f"# ROUND 1: Initial Planning") |
| | print(f"{'#'*80}") |
| | |
| | current_plans = {} |
| | for agent in agents: |
| | plan = self.generate_initial_plan(item, agent) |
| | plans_history[agent].append(plan) |
| | current_plans[agent] = plan |
| | |
| | |
| | if self.rounds > 1: |
| | confidences = {} |
| | print(f"\n{'='*60}") |
| | print("Evaluating Round 1 Plan Quality...") |
| | print(f"{'='*60}") |
| | |
| | for agent in agents: |
| | conf = self.evaluate_plan_confidence(item, current_plans[agent]) |
| | confidences[agent] = conf |
| | print(f"{agent} Plan Confidence: {conf:.1f}%") |
| | |
| | avg_confidence = sum(confidences.values()) / len(confidences) |
| | print(f"Average Confidence: {avg_confidence:.1f}%") |
| | |
| | if avg_confidence >= self.early_stop_threshold: |
| | print(f"\n✓ High confidence achieved! Skipping remaining rounds.") |
| | item['actual_rounds'] = actual_rounds |
| | item['early_stopped'] = True |
| | item['final_confidence'] = avg_confidence |
| | |
| | final_plan = self.fuse_plans(item, current_plans) |
| | code = self.generate_code_from_plan(item, final_plan) |
| | |
| | |
| | print(f"\n{'#'*80}") |
| | print(f"# DEBUGGING (Early Stop)") |
| | print(f"{'#'*80}") |
| | code = self.debugging([final_plan, None, None], code, item, "Please strictly follow the plan.") |
| | |
| | return code, self.pr_tok, self.com_tok |
| | |
| | |
| | for round_num in range(2, self.rounds + 1): |
| | actual_rounds = round_num |
| | print(f"\n{'#'*80}") |
| | print(f"# ROUND {round_num}: Debate and Refinement") |
| | print(f"{'#'*80}") |
| | |
| | new_plans = {} |
| | for agent in agents: |
| | other_plans = {a: current_plans[a] for a in agents if a != agent} |
| | refined_plan = self.generate_debate_plan( |
| | item, agent, round_num, |
| | current_plans[agent], other_plans |
| | ) |
| | plans_history[agent].append(refined_plan) |
| | new_plans[agent] = refined_plan |
| | |
| | current_plans = new_plans |
| | |
| | if round_num < self.rounds: |
| | confidences = {} |
| | print(f"\n{'='*60}") |
| | print(f"Evaluating Round {round_num} Plan Quality...") |
| | print(f"{'='*60}") |
| | |
| | for agent in agents: |
| | conf = self.evaluate_plan_confidence(item, current_plans[agent]) |
| | confidences[agent] = conf |
| | print(f"{agent} Plan Confidence: {conf:.1f}%") |
| | |
| | avg_confidence = sum(confidences.values()) / len(confidences) |
| | if avg_confidence >= self.early_stop_threshold: |
| | print(f"\n✓ High confidence achieved! Skipping remaining rounds.") |
| | item['actual_rounds'] = actual_rounds |
| | item['early_stopped'] = True |
| | item['final_confidence'] = avg_confidence |
| | break |
| | |
| | if 'actual_rounds' not in item: |
| | item['actual_rounds'] = actual_rounds |
| | item['early_stopped'] = False |
| | |
| | |
| | print(f"\n{'#'*80}") |
| | print(f"# FUSION: Combining All Perspectives") |
| | print(f"{'#'*80}") |
| | final_plan = self.fuse_plans(item, current_plans) |
| | |
| | |
| | print(f"\n{'#'*80}") |
| | print(f"# CODE GENERATION") |
| | print(f"{'#'*80}") |
| | code = self.generate_code_from_plan(item, final_plan) |
| |
|
| | |
| | print(f"\n{'#'*80}") |
| | print(f"# DEBUGGING") |
| | print(f"{'#'*80}") |
| | code = self.debugging([final_plan, None, None], code, item, "Please strictly follow the plan.") |
| | |
| | print(f"\n{'='*80}") |
| | print(f"Completed: {item.get('task_id', item.get('name', 'unknown'))}") |
| | |
| | |
| | return code, self.pr_tok, self.com_tok |