""" IPO Triple Extractor AZR Python Executor 기반 (Input, Program, Output) 트리플 추출 시스템 요구사항 2: "AZR Python Executor를 이용하여 (i,p,o) pair를 만든다" """ import ast import re import json from typing import Dict, List, Any, Tuple, Optional from concurrent.futures import TimeoutError from ..utils.code_utils.python_executor import PythonExecutor from .config import TestTimeConfig from .logger import TestTimeLogger from .solution_generator import InitialSolutionGenerator class IPOBuffer: """IPO triple을 저장하고 관리하는 버퍼""" def __init__(self): self.buffer = {} # {problem_id: [ipo_triples]} def add(self, problem_id: str, ipo_triple: Dict[str, Any]): """IPO triple을 버퍼에 추가""" if problem_id not in self.buffer: self.buffer[problem_id] = [] self.buffer[problem_id].append(ipo_triple) def get_all(self, problem_id: str) -> List[Dict[str, Any]]: """특정 문제의 모든 IPO triple 반환""" return self.buffer.get(problem_id, []) def clear(self, problem_id: str = None): """버퍼 초기화""" if problem_id: self.buffer.pop(problem_id, None) else: self.buffer.clear() def size(self, problem_id: str = None) -> int: """버퍼 크기 반환""" if problem_id: return len(self.buffer.get(problem_id, [])) return sum(len(triples) for triples in self.buffer.values()) class IPOTripleExtractor: """(Input, Program, Output) 트리플 추출 및 검증""" def __init__(self, config: TestTimeConfig, logger: Optional[TestTimeLogger] = None, model=None, tokenizer=None): self.config = config self.logger = logger or TestTimeLogger() self.model = model self.tokenizer = tokenizer # AZR Python Executor 초기화 (기존 방식) self.executor = PythonExecutor( timeout_length=config.python_executor_timeout, ast_check=True, # AZR 기본 설정 max_workers=config.max_workers ) self.extracted_triples = [] # 입력 생성 프롬프트와 응답 저장용 self.last_generation_prompt = "" self.last_generation_response = "" # VLLM 배치 처리를 위한 참조 self.solution_generator = None def extract_triples(self, problem: Dict[str, Any], solution: str) -> List[Dict[str, Any]]: """벤치마크 문제와 솔루션에서 IPO 트리플 추출""" problem_id = problem.get('task_id', 'unknown') self.logger.log_info(f"🔍 Extracting IPO triples for {problem_id}") triples = [] try: # 1. 함수 정보 추출 (entry point 우선) entry_point = problem.get('entry_point', 'unknown') func_info = self._extract_function_info(solution, entry_point) if not func_info: self.logger.log_error(f"Failed to extract function info from solution") return [] # 2. 테스트 케이스에서 입력-출력 쌍 생성 (LLM 솔루션 기반) test_cases = self._extract_test_cases(problem, solution) # 3. 솔루션 실행으로 IPO 트리플 생성 for i, (test_input_str, expected_output) in enumerate(test_cases): if len(triples) >= self.config.max_ipo_triples: break # test_input_str에서 실제 인자 추출 (예: "strlen('')" -> "''") import re match = re.match(rf'{entry_point}\((.*)\)', test_input_str) if match: actual_args = match.group(1) else: actual_args = test_input_str # fallback triple = self._create_ipo_triple( func_info['full_code'], # 🔧 수정: 전체 코드 사용 (도우미 함수 포함) func_info, actual_args, # 실제 인자만 전달 expected_output, triple_id=f"{problem_id}_triple_{i}", full_input_str=test_input_str # 전체 입력 문자열도 전달 ) if triple: triples.append(triple) # 🔧 수정: Synthetic 트리플 생성 제거 (단일 예시만 사용하여 치팅 방지) # Synthetic 트리플 생성 로직을 제거하여 진짜 단일 예시만 사용 # 검증 및 로깅 validation_results = [self._validate_triple(triple) for triple in triples] self.logger.log_ipo_extraction(problem_id, triples, validation_results) # 유효한 트리플만 반환 valid_triples = [triple for triple, valid in zip(triples, validation_results) if valid] self.logger.log_info(f"✅ Extracted {len(valid_triples)}/{len(triples)} valid IPO triples") return valid_triples except Exception as e: self.logger.log_error(f"IPO extraction failed: {e}") return [] def _extract_function_info(self, solution: str, entry_point: str = None) -> Optional[Dict[str, str]]: """솔루션에서 함수 정보 추출 (entry point 우선)""" try: # 🔧 개선: Raw LLM response인지 확인하고 함수 코드 추출 processed_solution = solution if "LLM GENERATED SOLUTION:" in solution: self.logger.log_info("📝 Raw LLM response detected, extracting function code") processed_solution = self._extract_function_from_llm_response(solution) if not processed_solution: self.logger.log_error("Failed to extract function from LLM response") return None # AST로 함수 정의 파싱 tree = ast.parse(processed_solution) # 🔧 수정: Entry point 함수 우선 검색 target_function = None all_functions = [] for node in ast.walk(tree): if isinstance(node, ast.FunctionDef): func_info = { 'name': node.name, 'args': [arg.arg for arg in node.args.args], 'signature': f"def {node.name}({', '.join([arg.arg for arg in node.args.args])}):", 'full_code': processed_solution } all_functions.append(func_info) # Entry point와 일치하는 함수 우선 선택 if entry_point and node.name == entry_point: target_function = func_info # 이 로그는 너무 자주 출력되므로 debug 레벨로 변경 self.logger.log_debug(f"🎯 Found entry point function: {entry_point}") break # Entry point 함수를 찾았으면 반환 if target_function: return target_function # Entry point를 찾지 못했으면 첫 번째 함수 반환 (기존 방식) if all_functions: self.logger.log_warning(f"⚠️ Entry point '{entry_point}' not found, using first function: {all_functions[0]['name']}") return all_functions[0] return None except Exception as e: self.logger.log_error(f"Function parsing failed: {e}") return None def _extract_function_from_llm_response(self, llm_response: str) -> str: """Raw LLM response에서 함수 코드 추출 (solution_generator와 동일한 로직)""" lines = llm_response.split('\n') solution_lines = [] in_solution = False # "LLM GENERATED SOLUTION:" 섹션 추출 (수정된 로직) for i, line in enumerate(lines): if "LLM GENERATED SOLUTION:" in line: in_solution = True continue elif in_solution: # "===============" 라인이 나오면 종료하되, 첫 번째 "==============="는 건너뛰기 if "===============" in line: # 실제 솔루션 라인들이 있는지 확인 if solution_lines and any(l.strip() for l in solution_lines): break else: # 아직 솔루션 라인이 없으면 계속 진행 (첫 번째 구분선 건너뛰기) continue solution_lines.append(line) if not solution_lines: return "" # 추출 실패시 빈 문자열 반환 extracted_solution = '\n'.join(solution_lines).strip() # 함수 정의와 import 추출 (solution_generator 로직과 동일) lines = extracted_solution.split('\n') import_lines = [] func_lines = [] in_function = False indent_level = 0 # 1. import 문 수집 for line in lines: stripped = line.strip() if (stripped.startswith('import ') or stripped.startswith('from ')) and not stripped.startswith('#'): import_lines.append(line) # 2. 함수 정의 찾기 for line in lines: if line.strip().startswith('def '): in_function = True func_lines = [line] indent_level = len(line) - len(line.lstrip()) elif in_function: if not line.strip() or (line.strip() and len(line) - len(line.lstrip()) > indent_level): func_lines.append(line) else: break # 3. import + function 결합 if func_lines: result_lines = import_lines + [''] + func_lines if import_lines else func_lines return '\n'.join(result_lines) else: return extracted_solution def _fix_humaneval_canonical_solution(self, problem: Dict[str, Any]) -> str: """HumanEval canonical solution 복원 (함수 시그니처 추가)""" canonical_code = problem.get('canonical_solution', '') entry_point = problem.get('entry_point', '') prompt = problem.get('prompt', '') # HumanEval인지 확인 task_id = problem.get('task_id', '') if not task_id.startswith('HumanEval/'): return canonical_code # 이미 함수 시그니처가 있는지 확인 if f"def {entry_point}" in canonical_code: return canonical_code try: # Prompt에서 함수 시그니처 추출 import re def_pattern = rf'def\s+{re.escape(entry_point)}\s*\([^)]*\)[^:]*:' match = re.search(def_pattern, prompt, re.MULTILINE) if match: function_signature = match.group(0) # Import 문도 추출 (있다면) import_lines = [] for line in prompt.split('\n'): stripped = line.strip() if (stripped.startswith('import ') or stripped.startswith('from ')) and not stripped.startswith('#'): import_lines.append(line) # 완전한 canonical solution 구성 if import_lines: complete_canonical = '\n'.join(import_lines) + '\n\n' + function_signature + canonical_code else: complete_canonical = function_signature + canonical_code self.logger.log_info(f"🔧 Fixed HumanEval canonical solution for {entry_point}") return complete_canonical else: self.logger.log_warning(f"⚠️ Could not extract function signature for {entry_point}") return canonical_code except Exception as e: self.logger.log_error(f"Failed to fix HumanEval canonical solution: {e}") return canonical_code def _extract_single_prompt_example(self, problem: Dict[str, Any]) -> Optional[Tuple[str, str]]: """🔧 새로운 메서드: 프롬프트의 단일 예시만 추출 (치팅 방지)""" try: # base_input의 첫 번째 항목을 단일 예시로 사용 if 'base_input' in problem and problem['base_input']: first_input = problem['base_input'][0] entry_point = problem['entry_point'] self.logger.log_info(f"📥 Using first base_input as single example: {first_input}") # 🔧 수정: HumanEval canonical solution 복원 canonical_code = self._fix_humaneval_canonical_solution(problem) if canonical_code: actual_output = self._execute_llm_solution(canonical_code, entry_point, first_input) if actual_output is not None: # 입력 문자열 형식 생성 if isinstance(first_input, list): if len(first_input) == 1 and isinstance(first_input[0], list): # [[args]] -> 단일 리스트 인자로 표시 input_str = repr(first_input[0]) elif len(first_input) == 1: # [단일인자] -> 단일인자 input_str = repr(first_input[0]) else: # [다중인자] -> 다중인자 input_str = ', '.join(repr(arg) for arg in first_input) else: input_str = repr(first_input) result = (input_str, str(actual_output)) self.logger.log_info(f"✅ Single example extracted: Input={input_str}, Output={actual_output}") return result else: self.logger.log_warning("❌ Failed to compute output with canonical solution") else: self.logger.log_warning("❌ No canonical solution available") else: self.logger.log_warning("❌ No base_input available") except Exception as e: self.logger.log_error(f"Single example extraction failed: {e}") return None def _extract_docstring_examples(self, prompt: str, func_name: str) -> List[Tuple[str, str]]: """docstring에서 >>> 예제 추출""" examples = [] lines = prompt.split('\n') i = 0 while i < len(lines): line = lines[i].strip() # >>> func_name(...) 패턴 찾기 if line.startswith('>>>') and func_name in line: # 입력 추출 input_line = line[3:].strip() # >>> 제거 # 다음 줄에서 출력 추출 if i + 1 < len(lines): output_line = lines[i + 1].strip() # 출력이 >>> 로 시작하지 않으면 출력값 if not output_line.startswith('>>>'): examples.append((input_line, output_line)) i += 2 continue i += 1 else: i += 1 return examples def _extract_test_cases(self, problem: Dict[str, Any], solution: str) -> List[Tuple[str, str]]: """docstring의 예제에서 테스트 케이스 추출 (치팅 방지)""" test_cases = [] func_name = problem.get('entry_point', 'unknown') problem_id = problem.get('task_id', '') # HumanEval과 MBPP 모두 docstring 예제만 사용 self.logger.log_info(f"🎯 Extracting docstring examples for {problem_id}") # 프롬프트에서 docstring 예제 추출 prompt = problem.get('prompt', '') examples = self._extract_docstring_examples(prompt, func_name) if examples: self.logger.log_info(f"📝 Found {len(examples)} docstring examples") for i, (input_str, expected_output) in enumerate(examples): try: # 입력 파싱 (func_name(args) 형태에서 args 추출) import ast # "func_name(args)" -> args 추출 if input_str.startswith(func_name + '(') and input_str.endswith(')'): args_str = input_str[len(func_name)+1:-1] # 안전한 평가를 위해 ast.literal_eval 사용 try: # 단일 인자인 경우 input_args = ast.literal_eval(args_str) if not isinstance(input_args, tuple): input_args = (input_args,) except: # 여러 인자인 경우 input_args = ast.literal_eval(f"({args_str})") # LLM 솔루션 실행 actual_output = self._execute_llm_solution(solution, func_name, list(input_args)) if actual_output is not None: test_cases.append((input_str, str(actual_output))) self.logger.log_info(f"✅ Example {i+1}: {input_str} -> {actual_output}") else: self.logger.log_warning(f"❌ Example {i+1} execution failed") except Exception as e: self.logger.log_error(f"Example {i+1} parsing failed: {e}") else: self.logger.log_warning(f"⚠️ No docstring examples found, falling back to first base_input") # docstring 예제가 없으면 첫 번째 base_input만 사용 (MBPP처럼) if 'base_input' in problem and problem['base_input']: inp_args = problem['base_input'][0] # 입력 문자열 생성 if isinstance(inp_args, list): args_str = ', '.join(repr(arg) for arg in inp_args) input_str = f"{func_name}({args_str})" else: input_str = f"{func_name}({repr(inp_args)})" actual_output = self._execute_llm_solution(solution, func_name, inp_args) if actual_output is not None: test_cases.append((input_str, str(actual_output))) self.logger.log_info(f"📊 Extracted {len(test_cases)} test cases from docstring examples") return test_cases def _execute_llm_solution(self, llm_solution: str, func_name: str, input_args) -> Optional[str]: """LLM 생성 솔루션을 실행하여 실제 출력 계산""" try: if not llm_solution or func_name == 'unknown': return None # 🔧 수정: 실행용 코드 구성 (MBPP+ 이중 리스트 처리) if isinstance(input_args, list): # MBPP+ 데이터가 이중 리스트로 감싸진 경우 처리 if len(input_args) == 1 and isinstance(input_args[0], list): # [[args]] -> 단일 리스트 인자로 전달 args_str = repr(input_args[0]) elif len(input_args) == 1: # [단일인자] -> 단일 인자로 전달 args_str = repr(input_args[0]) else: # [다중인자] -> 다중 인자로 전달 args_str = ', '.join(repr(arg) for arg in input_args) else: args_str = repr(input_args) execution_code = f""" {llm_solution} # Execute LLM solution try: result = {func_name}({args_str}) print(repr(result)) except Exception as e: print(f"EXECUTION_ERROR: {{e}}") """ # AZR Python Executor로 실행 output, status = self.executor.apply(execution_code) if 'error' in status.lower() or 'EXECUTION_ERROR' in output: return None # 출력에서 결과 추출 output_lines = output.strip().split('\n') if output_lines: result_line = output_lines[-1].strip() # repr()로 출력된 결과를 그대로 반환 return result_line return None except Exception as e: self.logger.log_error(f"LLM solution execution failed: {e}") return None def _create_ipo_triple(self, solution: str, func_info: Dict[str, str], test_input: str, expected_output: str, triple_id: str, full_input_str: str = None) -> Optional[Dict[str, Any]]: """IPO 트리플 생성 및 검증 (AZR Python Executor 사용)""" try: # 1. 솔루션 실행으로 실제 출력 확인 actual_output = self._execute_function(solution, func_info['name'], test_input) if actual_output is None: return None # 2. IPO 트리플 구성 triple = { 'id': triple_id, 'input': test_input, # 실제 인자만 저장 (예: "''", "3.5") 'full_input_str': full_input_str or f"{func_info['name']}({test_input})", # 전체 입력 문자열은 별도 필드에 'program': solution, # 이미 func_info['full_code']가 전달됨 'expected_output': expected_output, 'actual_output': actual_output, 'function_name': func_info['name'], 'function_args': func_info['args'], 'is_correct': str(actual_output) == str(expected_output), 'extraction_method': 'test_case' } return triple except Exception as e: self.logger.log_error(f"Triple creation failed for {triple_id}: {e}") return None def _execute_function(self, code: str, func_name: str, inputs: str) -> Optional[str]: """AZR Python Executor로 함수 실행""" try: # 실행용 코드 구성 (AZR 템플릿 스타일) execution_code = f""" {code} # Execute function with inputs try: result = {func_name}({inputs}) print(repr(result)) except Exception as e: print(f"EXECUTION_ERROR: {{e}}") """ # AZR 방식으로 실행 output, status = self.executor.apply(execution_code) if 'error' in status.lower() or 'EXECUTION_ERROR' in output: return None # 출력에서 결과 추출 output_lines = output.strip().split('\n') if output_lines: return output_lines[-1].strip() return None except Exception as e: self.logger.log_error(f"Function execution failed: {e}") return None # 🔧 제거: Synthetic 트리플 생성 메서드들 제거 # 단일 예시만 사용하여 치팅 방지 목적에 맞게 불필요한 메서드들 제거 def _validate_triple(self, triple: Dict[str, Any]) -> bool: """IPO 트리플 검증""" if not self.config.validate_triples: return True try: # 1. 기본 필드 존재 확인 required_fields = ['input', 'program', 'expected_output', 'function_name'] if not all(field in triple for field in required_fields): return False # 2. 코드 구문 검증 try: ast.parse(triple['program']) except SyntaxError: return False # 3. 재실행으로 일관성 검증 (AZR 방식) # 이제 triple['input']은 이미 실제 인자만 포함 actual_output = self._execute_function( triple['program'], triple['function_name'], triple['input'] ) if actual_output is None: return False # 4. 출력 일치 확인 return str(actual_output) == str(triple['expected_output']) except Exception as e: self.logger.log_error(f"Triple validation failed: {e}") return False def get_triple_statistics(self) -> Dict[str, Any]: """추출된 트리플 통계""" if not self.extracted_triples: return {"total": 0, "valid": 0, "invalid": 0} valid_count = sum(1 for triple in self.extracted_triples if triple.get('is_correct', False)) return { "total": len(self.extracted_triples), "valid": valid_count, "invalid": len(self.extracted_triples) - valid_count, "extraction_methods": { "test_case": sum(1 for t in self.extracted_triples if t.get('extraction_method') == 'test_case'), "synthetic": sum(1 for t in self.extracted_triples if t.get('extraction_method') == 'synthetic') } } def generate_diverse_inputs(self, problem: Dict[str, Any], solution: str, existing_examples: List[Tuple[str, str]]) -> List[Dict[str, Any]]: """LLM을 사용하여 다양한 입력 생성""" problem_id = problem.get('task_id', 'unknown') self.logger.log_info(f"🎲 Generating diverse inputs for {problem_id}") try: # 1. 함수 정보 추출 entry_point = problem.get('entry_point', 'unknown') func_info = self._extract_function_info(solution, entry_point) if not func_info: self.logger.log_error("Failed to extract function info for input generation") return [] # 2. 인자 타입 정보 추론 arg_type_info = self._infer_argument_types(func_info, existing_examples, solution) # 3. 프롬프트 생성 prompt = self._create_input_generation_prompt( problem_description=problem.get('prompt', ''), existing_examples=existing_examples, full_code=solution, arg_type_info=arg_type_info ) # 4. LLM으로 입력 생성 generated_inputs = self._call_llm_for_inputs(prompt, existing_examples, func_info, arg_type_info) # 5. 생성된 입력 검증 valid_inputs = self._validate_generated_inputs(generated_inputs, func_info, solution) self.logger.log_info(f"✅ Generated {len(valid_inputs)} valid diverse inputs") return valid_inputs except Exception as e: self.logger.log_error(f"Failed to generate diverse inputs: {e}") return [] def generate_diverse_inputs_batch(self, program_input_pairs: List[Dict[str, Any]]) -> Tuple[List[List[Dict[str, Any]]], List[Optional[Dict[str, Any]]]]: """배치로 여러 프로그램의 diverse input 생성""" if not self.solution_generator: self.logger.log_error("Solution generator not set for batch processing") return [], [] self.logger.log_info(f"🎲 Generating diverse inputs for {len(program_input_pairs)} programs (BATCH)") try: # 모든 프로그램의 입력 생성 프롬프트 생성 batch_prompts = [] program_contexts = [] for pair in program_input_pairs: problem = pair['problem'] solution = pair['solution'] existing_examples = pair['existing_examples'] # 함수 정보 추출 entry_point = problem.get('entry_point', 'unknown') func_info = self._extract_function_info(solution, entry_point) if not func_info: program_contexts.append(None) batch_prompts.append("") continue # 인자 타입 정보 추론 arg_type_info = self._infer_argument_types(func_info, existing_examples, solution) # 프롬프트 생성 prompt = self._create_input_generation_prompt( problem_description=problem.get('prompt', ''), existing_examples=existing_examples, full_code=solution, arg_type_info=arg_type_info ) batch_prompts.append(prompt) program_contexts.append({ 'func_info': func_info, 'solution': solution, 'problem': problem }) # VLLM 배치로 LLM 호출 if not batch_prompts or all(not p for p in batch_prompts): return [], [] self.logger.log_info(f"🔍 Sending {len(batch_prompts)} prompts to VLLM for input generation") self.logger.log_info(f"🔍 First prompt preview: {batch_prompts[0][:200]}..." if batch_prompts else "No prompts") # Input generation은 코드 생성이 아니므로 후처리 없이 원시 응답 사용 # generate_batch의 후처리(함수 추출 등)는 input generation에 부적합 batch_responses = self.solution_generator._generate_batch_with_vllm( batch_prompts, temperature=0.7 # Input generation에는 약간의 랜덤성 필요 ) self.logger.log_info(f"🔍 Received {len(batch_responses)} responses from VLLM") for i, response in enumerate(batch_responses[:2]): # 처음 2개만 로깅 self.logger.log_info(f"🔍 Response {i} preview: {response[:200]}...") # 각 응답을 파싱하여 입력 생성 batch_results = [] batch_generation_info = [] # 각 프로그램의 input generation 정보 저장 for i, (response, context) in enumerate(zip(batch_responses, program_contexts)): if context is None: batch_results.append([]) batch_generation_info.append(None) continue try: # 응답에서 입력 추출 generated_inputs = self._parse_llm_input_response( response, context['func_info'], context['problem'].get('task_id', 'unknown') ) # 디버깅: 파싱된 입력 개수 로깅 self.logger.log_info(f"🔍 Parsed {len(generated_inputs)} inputs from response {i}") if generated_inputs: self.logger.log_info(f"🔍 First parsed input: {generated_inputs[0]}") # 생성된 입력 검증 valid_inputs = self._validate_generated_inputs( generated_inputs, context['func_info'], context['solution'] ) # 디버깅: 검증 후 입력 개수 로깅 self.logger.log_info(f"🔍 {len(valid_inputs)} inputs passed validation from response {i}") batch_results.append(valid_inputs) # Input generation 정보 저장 generation_info = { 'prompt': batch_prompts[i] if i < len(batch_prompts) else '', 'llm_response': response, 'extracted_inputs': generated_inputs, 'valid_inputs': valid_inputs, 'existing_examples': program_input_pairs[i]['existing_examples'] if i < len(program_input_pairs) else [], 'function_info': context['func_info'], 'arg_type_info': self._infer_argument_types( context['func_info'], program_input_pairs[i]['existing_examples'] if i < len(program_input_pairs) else [], context['solution'] ) } batch_generation_info.append(generation_info) except Exception as e: self.logger.log_error(f"Failed to process batch item {i}: {e}") # 더 자세한 디버깅 정보 추가 self.logger.log_error(f"Response preview: {response[:200]}...") import traceback self.logger.log_error(f"Traceback: {traceback.format_exc()}") batch_results.append([]) # 에러 정보도 저장 batch_generation_info.append({ 'error': str(e), 'prompt': batch_prompts[i] if i < len(batch_prompts) else '', 'llm_response': response, 'traceback': traceback.format_exc() }) total_generated = sum(len(inputs) for inputs in batch_results) self.logger.log_info(f"✅ Generated {total_generated} diverse inputs across {len(program_input_pairs)} programs") # Return both inputs and generation info as a tuple return batch_results, batch_generation_info except Exception as e: self.logger.log_error(f"Batch input generation failed: {e}") return [], [] def _parse_llm_input_response(self, llm_response: str, func_info: Dict[str, Any], problem_id: str) -> List[Dict[str, Any]]: """LLM 응답에서 입력 예제 파싱""" self.logger.log_info(f"🔍 Parsing LLM response for {problem_id}, response length: {len(llm_response)}") try: # ```python ... ``` 블록에서 코드 추출 import re code_pattern = r'```python\n(.*?)\n```' matches = re.findall(code_pattern, llm_response, re.DOTALL) if not matches: self.logger.log_info("🔍 No code block found, searching for examples = [") # 블록이 없으면 전체 응답에서 examples = 찾기 if 'examples = [' in llm_response: start = llm_response.find('examples = [') # 균형잡힌 괄호 찾기 bracket_count = 0 end = start for i, char in enumerate(llm_response[start:]): if char == '[': bracket_count += 1 elif char == ']': bracket_count -= 1 if bracket_count == 0: end = start + i + 1 break if end > start: code = llm_response[start:end] self.logger.log_info(f"🔍 Found examples code: {code[:100]}...") exec_globals = {} exec(code, exec_globals) examples = exec_globals.get('examples', []) self.logger.log_info(f"🔍 Extracted {len(examples)} examples") return examples else: self.logger.log_info("🔍 No 'examples = [' found in response") else: # 코드 블록에서 examples 추출 self.logger.log_info(f"🔍 Found {len(matches)} code blocks") code = matches[0] self.logger.log_info(f"🔍 Code block preview: {code[:100]}...") exec_globals = {} exec(code, exec_globals) examples = exec_globals.get('examples', []) self.logger.log_info(f"🔍 Extracted {len(examples)} examples from code block") # examples가 dict가 아닌 경우 처리 if examples and len(examples) > 0: self.logger.log_info(f"🔍 First example type: {type(examples[0])}") if isinstance(examples[0], dict): # expected_output, description 등 불필요한 키 제거 cleaned_examples = [] for ex in examples: cleaned = {k: v for k, v in ex.items() if k not in ['expected_output', 'description']} if cleaned: # 빈 dict가 아닌 경우만 추가 cleaned_examples.append(cleaned) self.logger.log_info(f"🔍 Cleaned {len(cleaned_examples)} examples") return cleaned_examples return examples return [] except Exception as e: self.logger.log_error(f"Failed to parse generated examples for {problem_id}: {e}") import traceback self.logger.log_error(f"Traceback: {traceback.format_exc()}") return [] def _infer_argument_types(self, func_info: Dict[str, str], examples: List[Tuple[str, str]], solution: str) -> Dict[str, str]: """기존 예제와 AST 분석으로 인자 타입 추론""" arg_types = {} func_name = func_info['name'] arg_names = func_info['args'] # 1. AST에서 type annotation 추출 try: tree = ast.parse(solution) for node in ast.walk(tree): if isinstance(node, ast.FunctionDef) and node.name == func_name: for i, arg in enumerate(node.args.args): if i < len(arg_names) and arg.annotation: # Type annotation이 있는 경우 arg_types[arg_names[i]] = ast.unparse(arg.annotation) except: pass # 2. 기존 예제에서 타입 추론 if examples: for input_str, _ in examples: # "func_name(args)" 형태에서 args 추출 if input_str.startswith(func_name + '(') and input_str.endswith(')'): args_str = input_str[len(func_name)+1:-1] try: # 인자 파싱 parsed_args = eval(f"({args_str},)") if not isinstance(parsed_args, tuple): parsed_args = (parsed_args,) # 각 인자의 타입 추론 for i, arg_value in enumerate(parsed_args): if i < len(arg_names): arg_name = arg_names[i] arg_type = type(arg_value).__name__ # 특별한 케이스 처리 if isinstance(arg_value, list): if arg_value and all(isinstance(x, type(arg_value[0])) for x in arg_value): inner_type = type(arg_value[0]).__name__ arg_type = f"List[{inner_type}]" else: arg_type = "List" # 기존 타입과 병합 if arg_name not in arg_types: arg_types[arg_name] = arg_type except: pass # 3. 타입 정보 딕셔너리로 반환 # arg_types가 비어있으면 unknown 타입으로 채우기 for arg_name in arg_names: if arg_name not in arg_types: arg_types[arg_name] = "Any (type unknown)" return arg_types def _create_input_generation_prompt(self, problem_description: str, existing_examples: List[Tuple[str, str]], full_code: str, arg_type_info: Dict[str, str]) -> str: """입력 생성을 위한 프롬프트 생성""" # 모든 기존 예제를 포맷팅 examples_text = "" for i, (input_str, output_str) in enumerate(existing_examples): examples_text += f"Example {i+1}:\n" examples_text += f"Input: {input_str}\n" examples_text += f"Output: {output_str}\n\n" # arg_type_info를 문자열로 포맷팅 arg_type_text = "Argument types:\n" for arg, arg_type in arg_type_info.items(): arg_type_text += f"- {arg}: {arg_type}\n" prompt = f"""Given the following problem description and its Python function implementation, first analyze the types and valid ranges of the function arguments, then write **5 different example inputs** for the function that cover a diverse mix of typical (general) cases and edge/boundary cases. Problem Description: ''' {problem_description} ''' Existing Examples from Problem: {examples_text} Function Implementation: ```python {full_code} ``` {arg_type_text} Based on the existing examples above, generate 5 NEW diverse test inputs that are different from the existing ones. Each input should be a Python dict where: - Keys are the exact parameter names from the function signature - Values are appropriate test values for each parameter Format your response as: ```python examples = [ {{dict_with_all_function_parameters}}, # Description of this test case {{dict_with_all_function_parameters}}, # Description of this test case ... # Continue for all 5 examples ] ``` Ensure your examples include: - At least 2 typical/general cases - At least 2 edge/boundary cases - 1 special case (empty, zero, maximum values, etc.) - All examples should be DIFFERENT from the existing examples shown above""" return prompt def _call_llm_for_inputs(self, prompt: str, existing_examples: List[Tuple[str, str]], func_info: Dict[str, Any], arg_type_info: str) -> List[Dict[str, Any]]: """LLM을 호출하여 입력 생성 및 파싱""" # 프롬프트 저장 self.last_generation_prompt = prompt try: # Input 생성용 전용 LLM 호출 (temperature=0.5) if self.model is not None and self.tokenizer is not None: # VLLM 사용 확인 try: from vllm import LLM if isinstance(self.model, LLM): response = self._generate_with_vllm_for_inputs(prompt) else: response = self._generate_with_hf_for_inputs(prompt) except ImportError: response = self._generate_with_hf_for_inputs(prompt) # 응답 저장 self.last_generation_response = response # 응답에서 examples 추출 parsed_inputs = self._parse_generated_examples(response) # 입력 생성 정보 저장 self.last_input_generation_info = { 'prompt': prompt, 'llm_response': response, 'extracted_inputs': parsed_inputs, 'existing_examples': existing_examples, 'function_info': func_info, 'arg_type_info': arg_type_info } return parsed_inputs else: # 모델이 없으면 빈 리스트 반환 (테스트 환경) self.logger.log_warning("No model available for input generation") self.last_generation_response = "No model available" # 실패한 경우에도 정보 저장 self.last_input_generation_info = { 'prompt': prompt, 'llm_response': "No model available", 'extracted_inputs': [], 'existing_examples': existing_examples, 'function_info': func_info, 'arg_type_info': arg_type_info, 'error': "No model available" } return [] except Exception as e: self.logger.log_error(f"Failed to call LLM for inputs: {e}") self.last_generation_response = f"Error: {str(e)}" # 에러 발생 시에도 정보 저장 self.last_input_generation_info = { 'prompt': locals().get('prompt', 'N/A'), 'llm_response': f"Error: {str(e)}", 'extracted_inputs': [], 'existing_examples': locals().get('existing_examples', []), 'function_info': locals().get('func_info', {}), 'arg_type_info': locals().get('arg_type_info', 'N/A'), 'error': str(e) } return [] def _generate_with_vllm_for_inputs(self, prompt: str) -> str: """Input 생성용 VLLM 백엔드 (temperature=0.5로 다양성 확보)""" try: from vllm import SamplingParams # Input 생성용 높은 temperature 설정 sampling_params = SamplingParams( temperature=0.5, # 다양한 입력 생성을 위한 높은 temperature max_tokens=2048, top_p=0.95, # 다양성을 위해 top_p 사용 stop=["\n```\n"], # 코드 블록 종료 시 정지 ) outputs = self.model.generate([prompt], sampling_params, use_tqdm=False) return outputs[0].outputs[0].text.replace("\t", " ").strip() except Exception as e: self.logger.log_error(f"VLLM input generation failed: {e}") return "" def _generate_with_hf_for_inputs(self, prompt: str) -> str: """Input 생성용 HuggingFace 백엔드 (temperature=0.5로 다양성 확보)""" try: import torch # 토크나이저 처리 inputs = self.tokenizer(prompt, return_tensors='pt', truncation=True, max_length=4096) # attention mask 명시적으로 설정 if 'attention_mask' not in inputs: inputs['attention_mask'] = torch.ones_like(inputs['input_ids']) # 디바이스 이동 inputs = {k: v.to(self.model.device) for k, v in inputs.items()} with torch.no_grad(): # 메모리 정리 if torch.cuda.is_available(): torch.cuda.empty_cache() # Input 생성용 sampling 설정 outputs = self.model.generate( inputs['input_ids'], attention_mask=inputs['attention_mask'], max_new_tokens=2048, do_sample=True, # sampling 활성화 temperature=0.5, # 다양한 입력 생성을 위한 temperature top_p=0.95, # 다양성을 위해 top_p 사용 pad_token_id=self.tokenizer.eos_token_id, eos_token_id=self.tokenizer.eos_token_id ) # 응답 추출 response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) response = response[len(prompt):].strip() return response except Exception as e: self.logger.log_error(f"HuggingFace input generation failed: {e}") return "" def _parse_generated_examples(self, llm_response: str) -> List[Dict[str, Any]]: """LLM 응답에서 예제 파싱""" try: # ```python ... ``` 블록에서 코드 추출 import re code_pattern = r'```python\n(.*?)\n```' matches = re.findall(code_pattern, llm_response, re.DOTALL) if not matches: # 블록이 없으면 전체 응답에서 examples = 찾기 if 'examples = [' in llm_response: start = llm_response.find('examples = [') # 균형잡힌 괄호 찾기 bracket_count = 0 end = start for i, char in enumerate(llm_response[start:]): if char == '[': bracket_count += 1 elif char == ']': bracket_count -= 1 if bracket_count == 0: end = start + i + 1 break if end > start: code = llm_response[start:end] exec_globals = {} exec(code, exec_globals) return exec_globals.get('examples', []) else: # 코드 블록에서 examples 추출 code = matches[0] exec_globals = {} exec(code, exec_globals) return exec_globals.get('examples', []) return [] except Exception as e: self.logger.log_error(f"Failed to parse generated examples: {e}") return [] def _validate_generated_inputs(self, generated_inputs: List[Dict[str, Any]], func_info: Dict[str, str], solution: str) -> List[Dict[str, Any]]: """생성된 입력의 유효성 검증""" valid_inputs = [] func_name = func_info['name'] for i, input_dict in enumerate(generated_inputs): try: # 1. 필수 인자 확인 required_args = set(func_info['args']) provided_args = set(input_dict.keys()) if not required_args.issubset(provided_args): self.logger.log_warning(f"Input {i+1} missing required args: {required_args - provided_args}") continue # 2. 실제 실행으로 검증 # 인자를 순서대로 배열 args = [input_dict[arg] for arg in func_info['args'] if arg in input_dict] # 실행 테스트 output = self._execute_llm_solution(solution, func_name, args) if output is not None: valid_inputs.append(input_dict) self.logger.log_info(f"✅ Valid input {i+1}: {input_dict}") else: self.logger.log_warning(f"❌ Input {i+1} execution failed") except Exception as e: self.logger.log_error(f"Input {i+1} validation error: {e}") return valid_inputs def create_ipo_from_input(self, problem: Dict[str, Any], solution: str, input_dict: Dict[str, Any]) -> Optional[Dict[str, Any]]: """새로운 입력으로 IPO triple 생성""" try: problem_id = problem.get('task_id', 'unknown') entry_point = problem.get('entry_point', 'unknown') # 함수 정보 추출 func_info = self._extract_function_info(solution, entry_point) if not func_info: return None # 인자를 순서대로 배열 args = [input_dict[arg] for arg in func_info['args'] if arg in input_dict] # 실행하여 출력 얻기 output = self._execute_llm_solution(solution, func_info['name'], args) if output is None: return None # 입력 문자열 생성 args_str = ', '.join(repr(arg) for arg in args) full_input_str = f"{func_info['name']}({args_str})" # IPO triple 생성 triple_id = f"{problem_id}_generated_{len(self.extracted_triples)}" triple = { 'id': triple_id, 'input': args_str, # 실제 인자만 'full_input_str': full_input_str, # 전체 함수 호출 'program': solution, 'expected_output': output, 'actual_output': output, 'function_name': func_info['name'], 'function_args': func_info['args'], 'is_correct': True, # 생성된 것은 항상 정확 'extraction_method': 'generated' } return triple except Exception as e: self.logger.log_error(f"Failed to create IPO from input: {e}") return None def cleanup(self): """리소스 정리""" if hasattr(self.executor, 'cleanup'): self.executor.cleanup()