| | from typing import List |
| | import tiktoken |
| | import os |
| | import json |
| | import re |
| | import sys |
| | import time |
| |
|
| | from copy import deepcopy |
| | import xml.etree.ElementTree as ET |
| |
|
| | from .Base import BaseStrategy |
| | from models.Base import BaseModel |
| | from models.Pangu import Pangu |
| |
|
| | from datasets.Dataset import Dataset |
| | from datasets.APPSDataset import APPSDataset |
| | from datasets.MBPPDataset import MBPPDataset |
| | from datasets.XCodeDataset import XCodeDataset |
| | from datasets.HumanEvalDataset import HumanDataset |
| | from datasets.CodeContestDataset import CodeContestDataset |
| |
|
| | from results.Results import Results |
| | from evaluations.func_evaluate import evaluate_io |
| |
|
| | mapping = { |
| | 1: "one (01)", |
| | 2: "two (02)", |
| | 3: "three (03)", |
| | 4: "four (04)", |
| | 5: "five (05)", |
| | 6: "six (06)", |
| | 7: "seven (07)", |
| | 8: "eight (08)", |
| | 9: "nine (09)", |
| | } |
| |
|
| | |
| |
|
| |
|
| | class MapCoder(BaseStrategy): |
| | def __init__( |
| | self, |
| | k: int = 3, |
| | t: int = 5, |
| | pr_tok: int = 0, |
| | com_tok: int = 0, |
| | *args, |
| | **kwargs |
| | ): |
| | super().__init__(*args, **kwargs) |
| | self.k = k |
| | self.t = t |
| | self.pr_tok = 0 |
| | self.com_tok = 0 |
| |
|
| | def xml_to_dict(self, element): |
| | result = {} |
| | for child in element: |
| | if child: |
| | child_data = self.xml_to_dict(child) |
| | if child.tag in result: |
| | if isinstance(result[child.tag], list): |
| | result[child.tag].append(child_data) |
| | else: |
| | result[child.tag] = [result[child.tag], child_data] |
| | else: |
| | result[child.tag] = child_data |
| | else: |
| | result[child.tag] = child.text |
| | return result |
| | |
| | def remove_before_root(self, response: str) -> str: |
| | start_index = response.find('<root>') |
| | if start_index != -1: |
| | return response[start_index:] |
| | return response |
| |
|
| | def parse_xml(self, response: str) -> dict: |
| | if '```xml' in response: |
| | response = response.replace('```xml', '') |
| | if '```' in response: |
| | response = response.replace('```', '') |
| | |
| | |
| | response = self.remove_before_root(response) |
| |
|
| | try: |
| | root = ET.fromstring(response) |
| | except: |
| | try: |
| | root = ET.fromstring('<root>\n' + response + '\n</root>') |
| | except: |
| | root = ET.fromstring('<root>\n' + response) |
| | mid = self.xml_to_dict(root) |
| | for k,v in mid.items(): |
| | print(f"{k}") |
| | |
| | return mid |
| |
|
| | def parse_code(self, response: str) -> str: |
| | if "```" not in response: |
| | return response |
| |
|
| | code_pattern = r'```((.|\n)*?)```' |
| | if "```Python" in response: |
| | code_pattern = r'```Python((.|\n)*?)```' |
| | if "```Python3" in response: |
| | code_pattern = r'```Python3((.|\n)*?)```' |
| | if "```python" in response: |
| | code_pattern = r'```python((.|\n)*?)```' |
| | if "```python3" in response: |
| | code_pattern = r'```python3((.|\n)*?)```' |
| | if "```C" in response: |
| | code_pattern = r'```C((.|\n)*?)```' |
| | if "```c" in response: |
| | code_pattern = r'```c((.|\n)*?)```' |
| | if "```C++" in response: |
| | code_pattern = r'```C\+\+((.|\n)*?)```' |
| | if "```c++" in response: |
| | code_pattern = r'```c\+\+((.|\n)*?)```' |
| | if "```Java" in response: |
| | code_pattern = r'```Java((.|\n)*?)```' |
| | if "```java" in response: |
| | code_pattern = r'```java((.|\n)*?)```' |
| | if "```Node" in response: |
| | code_pattern = r'```Node((.|\n)*?)```' |
| | if "```node" in response: |
| | code_pattern = r'```node((.|\n)*?)```' |
| | if "```Rust" in response: |
| | code_pattern = r'```Rust((.|\n)*?)```' |
| | if "```rust" in response: |
| | code_pattern = r'```rust((.|\n)*?)```' |
| | if "```PHP" in response: |
| | code_pattern = r'```PHP((.|\n)*?)```' |
| | if "```php" in response: |
| | code_pattern = r'```php((.|\n)*?)```' |
| | if "```Go" in response: |
| | code_pattern = r'```Go((.|\n)*?)```' |
| | if "```go" in response: |
| | code_pattern = r'```go((.|\n)*?)```' |
| | if "```Ruby" in response: |
| | code_pattern = r'```Ruby((.|\n)*?)```' |
| | if "```ruby" in response: |
| | code_pattern = r'```ruby((.|\n)*?)```' |
| | if "```C#" in response: |
| | code_pattern = r'```C#((.|\n)*?)```' |
| | if "```c#" in response: |
| | code_pattern = r'```c#((.|\n)*?)```' |
| | if "```csharp" in response: |
| | code_pattern = r'```csharp((.|\n)*?)```' |
| |
|
| | code_blocks = re.findall(code_pattern, response, re.DOTALL) |
| |
|
| | if type(code_blocks[-1]) == tuple or type(code_blocks[-1]) == list: |
| | code_str = "\n".join(code_blocks[-1]) |
| | elif type(code_blocks[-1]) == str: |
| | code_str = code_blocks[-1] |
| | else: |
| | code_str = response |
| |
|
| | return code_str |
| |
|
| | @staticmethod |
| | def trim_text(text: str, trimmed_text: str): |
| | return text.replace(trimmed_text, '').strip() |
| |
|
| | @staticmethod |
| | def replace_tag(text: str, tag: str): |
| | if f'<{tag}><![CDATA[' in text and f']]></{tag}>' in text: |
| | return text |
| | else: |
| | return text.replace(f'<{tag}>', f'<{tag}><![CDATA[').replace(f'</{tag}>', f']]></{tag}>').strip() |
| |
|
| | @staticmethod |
| | def get_sample_io_str(sample_io: any) -> str: |
| | if len(sample_io) > 0: |
| | if type(sample_io[0]) == str: |
| | return "\n".join(sample_io) |
| | if type(sample_io[0]) == dict: |
| | return "\n".join([f"Input:\n{io['input']}\nExpected output:\n{io['output'][0]}" for io in sample_io]) |
| | return sample_io |
| | |
| | |
| | def log_response(self, content: str, description: str, item: dict): |
| | try: |
| | out_dir = os.path.join(os.getcwd(), "outputs", "responses") |
| | os.makedirs(out_dir, exist_ok=True) |
| | timestamp = int(time.time() * 1000) |
| | curtime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) |
| | file_id = item.get(self.data.id_key, timestamp) if isinstance(item, dict) else timestamp |
| | log_path = os.path.join(out_dir, f"MapCoder_{self.model.__class__.__name__}_responses.log") |
| | with open(log_path, 'a', encoding='utf-8') as fw: |
| | fw.write("---\n") |
| | fw.write(f"# timestamp: {curtime}\n") |
| | fw.write(f"# dataset: {self.data.__class__.__name__}\n") |
| | fw.write(f"# id: {file_id}\n") |
| | fw.write(f"# kind: {description}\n") |
| | fw.write(content) |
| | fw.write("\n\n") |
| | except Exception as e: |
| | print(f"Failed to append final code to log file: {e}", flush=True) |
| | |
| | def retrieval(self, item: dict) -> dict: |
| | input_kb_exemplars = [ |
| | { |
| | "role": "user", |
| | "content": f"""Given a problem, provide relevant problems then identify the algorithm behind it and also explain the tutorial of the algorithm. |
| | # Problem: |
| | {self.data.get_prompt(item)} |
| | |
| | # Exemplars: |
| | Recall {mapping[self.k]} relevant and distinct problems (different from problem mentioned above). For each problem, |
| | 1. describe it |
| | 2. generate {self.language} code step by step to solve that problem |
| | 3. finally generate a planning to solve that problem |
| | |
| | # Algorithm: |
| | |
| | ---------------- |
| | Important: |
| | Your response must follow the following xml format and you can only replace the line start with # inside the tags. Make sure all tags are closed and there is a single <root> element. |
| | |
| | <root> |
| | <problem> |
| | # Recall {mapping[self.k]} relevant and distinct problems (different from problem mentioned above). Write each problem in the following format. |
| | <description> |
| | # Describe the problem. |
| | </description> |
| | <code> |
| | # Let's think step by step to solve this problem in {self.language} programming language. |
| | </code> |
| | <planning> |
| | # Planning to solve this problem. |
| | </planning> |
| | </problem> |
| | |
| | <problem> |
| | # Recall {mapping[self.k]} relevant and distinct problems (different from problem mentioned above). Write each problem in the following format. |
| | <description> |
| | # Describe the problem. |
| | </description> |
| | <code> |
| | # Let's think step by step to solve this problem in {self.language} programming language. |
| | </code> |
| | <planning> |
| | # Planning to solve this problem. |
| | </planning> |
| | </problem> |
| | |
| | <problem> |
| | # Recall {mapping[self.k]} relevant and distinct problems (different from problem mentioned above). Write each problem in the following format. |
| | <description> |
| | # Describe the problem. |
| | </description> |
| | <code> |
| | # Let's think step by step to solve this problem in {self.language} programming language. |
| | </code> |
| | <planning> |
| | # Planning to solve this problem. |
| | </planning> |
| | </problem> |
| | |
| | <algorithm> |
| | # Identify the algorithm (Brute-force, Dynamic Programming, Divide-and-conquer, Greedy, Backtracking, Recursive, Binary search, and so on) that needs to be used to solve the original problem. |
| | # Write a useful tutorial about the above mentioned algorithms. Provide a high level generic tutorial for solving this types of problem. Do not generate code. |
| | </algorithm> |
| | </root> |
| | """, |
| | }, |
| | ] |
| |
|
| | print("\n\n________________________") |
| | print("Input for knowledge base and exemplars: ") |
| | print(input_kb_exemplars[0]['content'], flush=True) |
| |
|
| | response, pr_tok_retrieval, com_tok_retrieval = self.gpt_chat( |
| | processed_input=input_kb_exemplars |
| | ) |
| | item['api_calls'] = item.get('api_calls', 0) + 1 |
| | self.pr_tok += pr_tok_retrieval |
| | self.com_tok += com_tok_retrieval |
| |
|
| | |
| | response = self.trim_text( |
| | response, "# Identify the algorithm (Brute-force, Dynamic Programming, Divide-and-conquer, Greedy, Backtracking, Recursive, Binary search, and so on) that needs to be used to solve the original problem.") |
| | response = self.trim_text( |
| | response, "# Write a useful tutorial about the above mentioned algorithms. Provide a high level generic tutorial for solving this types of problem. Do not generate code.") |
| | response = self.trim_text( |
| | response, "# Planning to solve this problem:") |
| | response = self.trim_text( |
| | response, f"# Let's think step by step to solve this problem in {self.language} programming language.") |
| | response = self.replace_tag(response, 'algorithm') |
| | response = self.replace_tag(response, 'description') |
| | response = self.replace_tag(response, 'code') |
| | response = self.replace_tag(response, 'planning') |
| |
|
| | print("\n\n________________________") |
| | print("Response from knowledge base and exemplars: ") |
| | print(response, flush=True) |
| |
|
| | |
| | self.log_response(response, "Retrieval", item) |
| |
|
| | |
| | max_parse_retries = 3 |
| | parse_attempt = 0 |
| | parsed = None |
| | last_exception = None |
| | while parse_attempt <= max_parse_retries: |
| | try: |
| | parsed = self.parse_xml(response) |
| | for example_no, example in enumerate(parsed["problem"], start=1): |
| | if not isinstance(example, dict): |
| | raise ValueError(f"Parsed problem example {example_no} is not a dict.") |
| | if 'description' not in example or 'planning' not in example: |
| | raise ValueError(f"Parsed problem example {example_no} missing 'description' or 'planning' fields.") |
| | break |
| | except Exception as e: |
| | last_exception = e |
| | parse_attempt += 1 |
| | print(f"XML parse failed on attempt {parse_attempt}: {e}", flush=True) |
| | if parse_attempt > max_parse_retries: |
| | print("Exceeded XML parse retries. Using default parsed value and continuing.", flush=True) |
| | |
| | parsed = {'problem': [{'description': '', 'planning': ''}], 'algorithm': ''} |
| | break |
| |
|
| | response_retry, pr_tok_r, com_tok_r = self.gpt_chat( |
| | processed_input=input_kb_exemplars |
| | ) |
| | item['api_calls'] = item.get('api_calls', 0) + 1 |
| | self.pr_tok += pr_tok_r |
| | self.com_tok += com_tok_r |
| |
|
| | |
| | response = self.trim_text( |
| | response_retry, "# Identify the algorithm (Brute-force, Dynamic Programming, Divide-and-conquer, Greedy, Backtracking, Recursive, Binary search, and so on) that needs to be used to solve the original problem.") |
| | response = self.trim_text( |
| | response, "# Write a useful tutorial about the above mentioned algorithms. Provide a high level generic tutorial for solving this types of problem. Do not generate code.") |
| | response = self.trim_text( |
| | response, "# Planning to solve this problem:") |
| | response = self.trim_text( |
| | response, f"# Let's think step by step to solve this problem in {self.language} programming language.") |
| | response = self.replace_tag(response, 'algorithm') |
| | response = self.replace_tag(response, 'description') |
| | response = self.replace_tag(response, 'code') |
| | response = self.replace_tag(response, 'planning') |
| |
|
| | |
| | self.log_response(response, f"Retrieval-Retry-{parse_attempt}", item) |
| | |
| | if parse_attempt > max_parse_retries: |
| | parsed = {'problem': [{'description': '', 'planning': ''}], 'algorithm': ''} |
| | |
| | return parsed |
| | |
| | def planning(self, retrieval_output: dict, item: dict, algorithm_prompt: str, sample_io_prompt: str) -> list[list]: |
| | plannings = [] |
| |
|
| | for example_no, example in enumerate(retrieval_output["problem"], start=1): |
| | example_problem = example["description"] |
| | example_planning = example["planning"] |
| |
|
| | input_for_problem_planning = [ |
| | { |
| | "role": "user", |
| | "content": f"Given a competitive programming problem generate a concrete planning to solve the problem.\n# Problem:\n{example_problem}\n# Planning:\n{example_planning}\n{algorithm_prompt}\n## Problem to be solved:\n{self.data.get_prompt(item)}\n{sample_io_prompt}\n## Planning:\n\n----------------\nImportant: You should give only the planning to solve the problem. Do not add extra explanation or words." |
| | } |
| | ] |
| |
|
| | print("\n\n________________________") |
| | print( |
| | f"Input for our problem planning using example: {example_no}: ") |
| | print(input_for_problem_planning[0]['content'], flush=True) |
| |
|
| | planning, pr_tok_1, com_tok_1 = self.gpt_chat( |
| | input_for_problem_planning |
| | ) |
| | item['api_calls'] += 1 |
| | |
| | self.pr_tok += pr_tok_1 |
| | self.com_tok += com_tok_1 |
| |
|
| | |
| | |
| |
|
| | print("\n\n________________________") |
| | print("Response from our problem planning: ") |
| | print(planning, flush=True) |
| |
|
| | self.log_response(planning, f"Planning-Example-{example_no}", item) |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | input_for_planning_verification = [ |
| | { |
| | "role": "user", |
| | "content": f"Given a competitive programming problem and a plan to solve the problem in {self.language}, tell whether the plan is correct to solve this problem. # Problem:\n{self.data.get_prompt(item)}\n# Planning:\n{planning}\n Output: confidence score regarding the solvability of the problem\n Output Type: integer\n Output Range: 0-100\n Important: Your response must only contain the confidence score number, should not include any other explanations or words." |
| | } |
| | ] |
| |
|
| | |
| | |
| | |
| | print("Input for planning verification: ") |
| | print(input_for_planning_verification[0]['content'], flush=True) |
| |
|
| | max_confidence_retries = 3 |
| | conf_attempt = 0 |
| | verification_score = None |
| | |
| | verification_base = input_for_planning_verification[0]['content'] |
| |
|
| | while conf_attempt <= max_confidence_retries: |
| | conf_attempt += 1 |
| | prompt_content = verification_base |
| | if conf_attempt > 1: |
| | prompt_content += ( |
| | "\n\nIMPORTANT: Reply with exactly one integer between 0 and 100. " |
| | "Do not include any other words, punctuation, or formatting." |
| | ) |
| |
|
| | verification_input = [{"role": "user", "content": prompt_content}] |
| |
|
| | verification_res_raw, pr_tok_1, com_tok_1 = self.gpt_chat( |
| | verification_input |
| | ) |
| | item['api_calls'] = item.get('api_calls', 0) + 1 |
| | self.pr_tok += pr_tok_1 |
| | self.com_tok += com_tok_1 |
| |
|
| | print("Response from planning verification before parsing: ") |
| | print(verification_res_raw, flush=True) |
| |
|
| | |
| | try: |
| | s = str(verification_res_raw).strip() |
| | m = re.search(r"(-?\d+)", s) |
| | if m: |
| | val = int(m.group(1)) |
| | |
| | if val < 0: |
| | val = 0 |
| | if val > 100: |
| | val = 100 |
| | verification_score = val |
| | print("Response from planning verification after parsing: ") |
| | print(verification_score, flush=True) |
| | break |
| | else: |
| | raise ValueError(f"No integer found in model response: {s}") |
| | except Exception as e: |
| | print(f"Verification parse failed on attempt {conf_attempt}: {e}", flush=True) |
| | |
| | self.log_response(str(verification_res_raw), f"Verification-Retry-{conf_attempt}", item) |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | if conf_attempt > max_confidence_retries: |
| | verification_score = 100 |
| |
|
| | verification_res = verification_score |
| | self.log_response(str(verification_res), "Verification", item) |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | plannings.append(( |
| | planning, |
| | verification_res, |
| | example |
| | )) |
| |
|
| | |
| | |
| |
|
| | return plannings |
| |
|
| | def code_generation(self, plan: list, item: dict, algorithm_prompt: str, sample_io_prompt: str) -> str: |
| | planning, confidence, example = plan |
| |
|
| | if type(self.data) == APPSDataset or type(self.data) == CodeContestDataset or type(self.data) == XCodeDataset: |
| | std_input_prompt = "## Note: Strictly follow the input and output format. The input should be taken from Standard input and output should be given to standard output. If you are writing a function then after the function definition take input using `input()` function then call the function with specified parameters and finally print the output of the function. Do not add extra print statement otherwise it will failed the test cases." |
| | else: |
| | std_input_prompt = "" |
| |
|
| | input_for_final_code_generation = [ |
| | { |
| | "role": "user", |
| | "content": f"Given a competitive programming problem generate {self.language} code to solve the problem.\n{algorithm_prompt}\n## Problem to be solved:\n{self.data.get_prompt(item)}\n## Planning:\n{planning}\n{sample_io_prompt}\n## Let's think step by step.\n\n----------------\nImportant:\n{std_input_prompt}\n## Your response must contain only the {self.language} code to solve this problem. Do not add extra explanation or words." |
| | } |
| | ] |
| |
|
| | print("\n\n________________________") |
| | print("Input for final code generation: ") |
| | print(input_for_final_code_generation[0]['content'], flush=True) |
| |
|
| | code, pr_tok_1, com_tok_1 = self.gpt_chat( |
| | input_for_final_code_generation |
| | ) |
| | item['api_calls'] += 1 |
| | |
| |
|
| | |
| | |
| | self.pr_tok += pr_tok_1 |
| | self.com_tok += com_tok_1 |
| |
|
| | try: |
| | code = self.parse_code(code) |
| | except IndexError as e: |
| | print(f"parse_code raised IndexError: {e}. Will retry final code generation.", flush=True) |
| | max_code_retries = 2 |
| | parsed_success = False |
| | for cretry in range(1, max_code_retries + 1): |
| | retry_raw, pr_tok_r, com_tok_r = self.gpt_chat( |
| | input_for_final_code_generation |
| | ) |
| | item['api_calls'] = item.get('api_calls', 0) + 1 |
| | self.pr_tok += pr_tok_r |
| | self.com_tok += com_tok_r |
| |
|
| | try: |
| | retry_parsed = self.parse_code(retry_raw) |
| | code = retry_parsed |
| | parsed_success = True |
| | self.log_response(retry_raw, f"final_code_retry_success-{cretry}", item) |
| | break |
| | except Exception as e2: |
| | print(f"Retry {cretry} parse_code failed: {e2}", flush=True) |
| | self.log_response(retry_raw, f"final_code_retry_failed-{cretry}", item) |
| |
|
| | if not parsed_success: |
| | print("Final code generation: retries exhausted, using default fallback code.", flush=True) |
| | lang = (self.language or "").lower() |
| | if 'python' in lang: |
| | code = 'print("")' |
| | elif 'java' in lang: |
| | code = 'public class Main { public static void main(String[] args) { } }' |
| | elif 'c++' in lang or 'cpp' in lang: |
| | code = 'int main() { return 0; }' |
| | elif re.search(r"\bc\b", lang): |
| | code = 'int main() { return 0; }' |
| | elif 'js' in lang or 'node' in lang or 'javascript' in lang: |
| | code = 'console.log("")' |
| | else: |
| | code = '' |
| | |
| | self.log_response(code, "final_code_fallback", item) |
| |
|
| | print("\n\n________________________") |
| | print("Response from final code generation: ") |
| | print(code, flush=True) |
| | self.log_response(code, "final_code", item) |
| |
|
| | return code |
| |
|
| | def debugging(self, plan: list, code: str, item: dict, algorithm_prompt: str) -> str: |
| | passed = False |
| | planning, _, _ = plan |
| | |
| | plan_code_response = f"## Planning: {planning}\n## Code:\n```\n{code}\n```" |
| |
|
| | if type(self.data) == APPSDataset or type(self.data) == CodeContestDataset or type(self.data) == XCodeDataset: |
| | std_input_prompt = "## Note: Strictly follow the input and output format. The input should be taken from Standard input and output should be given to standard output. If you are writing a function then after the function definition take input using `input()` function then call the function with specified parameters and finally print the output of the function. Do not add extra print statement otherwise it will failed the test cases." |
| | else: |
| | std_input_prompt = "" |
| | |
| | for i in range(1, self.t + 1): |
| | passed, test_log = self.data.evaluate_sample_io( |
| | item, |
| | code, |
| | self.language |
| | ) |
| |
|
| | if passed: |
| | break |
| |
|
| | |
| | pangu_input = [ |
| | { |
| | "role": "user", |
| | "content": f"You are an expert programmer. The following code was generated to solve a problem but failed sample test cases.\n\n## Problem:\n{self.data.get_prompt(item)}\n\n## Generated Code:\n```\n{code}\n```\n\n## Test Report:\n{test_log}\n\nPlease analyze why the code failed and provide a specific plan to fix it. Do not generate the full code, just the analysis and fix plan." |
| | } |
| | ] |
| | |
| | print(f"Input for Pangu analysis: {i}") |
| | |
| |
|
| | pangu_model = Pangu() |
| | analysis, q_pr_tok, q_com_tok = pangu_model.prompt(pangu_input) |
| | self.pr_tok += q_pr_tok |
| | self.com_tok += q_com_tok |
| | |
| | print(f"Pangu Analysis: {analysis}", flush=True) |
| |
|
| | print(f"Input for improving code generation: {i}") |
| | input_for_improving_code = [ |
| | { |
| | "role": "user", |
| | "content": f"Given a competitive programming problem you have generated {self.language} code to solve the problem. But the generated code can not pass sample test cases.\n\nHere is an analysis of the failure and a fix plan provided by an expert:\n{analysis}\n\nImprove your code to solve the problem correctly based on this analysis.\n{algorithm_prompt}\n## Problem to be solved:\n{self.data.get_prompt(item)}\n{plan_code_response}\n## Test Report:\n{test_log}\n## Modified Planning:\n## Let's think step by step to modify {self.language} Code for solving this problem.\n\n----------------\nImportant:\n{std_input_prompt}\n## Your response must contain the modified planning and then the {self.language} code inside ``` block to solve this problem." |
| | } |
| | ] |
| |
|
| | print("\n\n________________________") |
| | print("Input for improving code generation: ") |
| | print(input_for_improving_code[0]['content'], flush=True) |
| |
|
| | response, pr_tok_1, com_tok_1 = self.gpt_chat( |
| | input_for_improving_code |
| | ) |
| | item['api_calls'] += 1 |
| | |
| |
|
| | |
| | |
| | self.pr_tok += pr_tok_1 |
| | self.com_tok += com_tok_1 |
| |
|
| | raw_code = deepcopy(code) |
| | try: |
| | code = self.parse_code(code) |
| | except IndexError as e: |
| | print(f"parse_code raised IndexError: {e}. Will retry final code generation.", flush=True) |
| | max_code_retries = 2 |
| | parsed_success = False |
| | for cretry in range(1, max_code_retries + 1): |
| | retry_raw, pr_tok_r, com_tok_r = self.gpt_chat( |
| | input_for_improving_code |
| | ) |
| | item['api_calls'] = item.get('api_calls', 0) + 1 |
| | self.pr_tok += pr_tok_r |
| | self.com_tok += com_tok_r |
| |
|
| | try: |
| | retry_parsed = self.parse_code(retry_raw) |
| | code = retry_parsed |
| | parsed_success = True |
| | self.log_response(retry_raw, f"final_code_retry_success-{cretry}", item) |
| | break |
| | except Exception as e2: |
| | print(f"Retry {cretry} parse_code failed: {e2}", flush=True) |
| | self.log_response(retry_raw, f"final_code_retry_failed-{cretry}", item) |
| |
|
| | if not parsed_success: |
| | print("Final code generation: retries exhausted, using raw code.", flush=True) |
| | code = raw_code |
| | |
| | self.log_response(code, "final_code_fallback", item) |
| |
|
| |
|
| | print("\n\n________________________") |
| | print("Response from improving code generation: ") |
| | print(response, flush=True) |
| | |
| | self.log_response(response, f"improving_code_attempt_{i}", item) |
| |
|
| | return passed |
| |
|
| | def run_single_pass(self, item: dict): |
| | print("", flush=True) |
| |
|
| | retrieval_output = self.retrieval(item) |
| |
|
| | algorithm_prompt = f"## Relevant Algorithm to solve the next problem:\n{ retrieval_output['algorithm']}" |
| | sample_io_prompt = f"## Sample Test cases: \n{self.get_sample_io_str(item['sample_io'])}\n" |
| | |
| |
|
| | plannings = self.planning(retrieval_output, item, algorithm_prompt, sample_io_prompt) |
| | plannings.sort(key=lambda x: x[1], reverse=True) |
| |
|
| | for plan in plannings: |
| | code = self.code_generation(plan, item, algorithm_prompt, sample_io_prompt) |
| |
|
| | passed = self.debugging(plan, code, item, algorithm_prompt) |
| |
|
| | if passed: |
| | break |
| | |
| | print("________________________\n\n", flush=True) |
| | return code, self.pr_tok, self.com_tok |
| |
|