#!/usr/bin/env python3 """ Complete TestTime RLVR Pipeline Test Script AZR 기반 TestTime RLVR 파이프라인을 실제 벤치마크 문제로 테스트 LLM 솔루션 생성 → IPO 추출 → 태스크 생성 → LLM 평가 → Reward 계산 전체 플로우 검증 """ import os import sys import torch import argparse import json from pathlib import Path from datetime import datetime # TestTime RLVR 모듈 임포트 sys.path.append('/home/ubuntu/RLVR/TestTime-RLVR-v2') from absolute_zero_reasoner.testtime.complete_pipeline import CompleteTestTimePipeline from absolute_zero_reasoner.testtime.config import TestTimeConfig, BenchmarkConfig from absolute_zero_reasoner.testtime.logger import TestTimeLogger from absolute_zero_reasoner.testtime.solution_generator import InitialSolutionGenerator def load_test_problem(): """간단한 테스트 문제 생성 (HumanEval 스타일)""" return { 'task_id': 'test/simple_sum', 'prompt': '''def add_two_numbers(a, b): """ Add two numbers and return the result. Args: a (int): First number b (int): Second number Returns: int: Sum of a and b Examples: >>> add_two_numbers(2, 3) 5 >>> add_two_numbers(-1, 1) 0 >>> add_two_numbers(0, 0) 0 """''', 'entry_point': 'add_two_numbers', 'canonical_solution': 'def add_two_numbers(a, b):\n return a + b', 'test': '''def check(candidate): assert candidate(2, 3) == 5 assert candidate(-1, 1) == 0 assert candidate(0, 0) == 0 assert candidate(10, -5) == 5''' } def save_detailed_results(result, args, output_dir): """상세한 결과를 개별 파일로 저장""" # 벤치마크와 문제 ID에 따른 디렉토리 구조 생성 benchmark = result.get('benchmark', 'unknown') problem_id = result['problem_id'] # '/' 유지 problem_id_safe = problem_id.replace('/', '_') # 파일명용 # {output_dir}/{benchmark}/{task_id} 구조로 디렉토리 생성 base_dir = os.path.join(output_dir, benchmark, problem_id_safe) os.makedirs(base_dir, exist_ok=True) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') # 1. 초기 LLM 솔루션 저장 (벤치마크 문제 해결) if 'llm_generation' in result['steps']: llm_step = result['steps']['llm_generation'] initial_solution_dir = os.path.join(base_dir, 'initial_solution') os.makedirs(initial_solution_dir, exist_ok=True) # 벤치마크 문제 원본 저장 if 'problem_loading' in result['steps']: problem_data = result['steps']['problem_loading'].get('problem', {}) problem_file = os.path.join(initial_solution_dir, f"{problem_id_safe}_original_problem.txt") with open(problem_file, 'w', encoding='utf-8') as f: f.write(f"Problem ID: {result['problem_id']}\n") f.write(f"Benchmark: {result['benchmark']}\n") f.write(f"Generated: {timestamp}\n") f.write("="*80 + "\n") f.write("ORIGINAL BENCHMARK PROBLEM:\n") f.write("="*80 + "\n") f.write(problem_data.get('prompt', 'No prompt available')) f.write("\n" + "="*80 + "\n") f.write("ENTRY POINT:\n") f.write("="*80 + "\n") f.write(problem_data.get('entry_point', 'No entry point')) f.write("\n" + "="*80 + "\n") f.write("CANONICAL SOLUTION:\n") f.write("="*80 + "\n") f.write(problem_data.get('canonical_solution', 'No canonical solution')) if 'test' in problem_data: f.write("\n" + "="*80 + "\n") f.write("TEST CASES:\n") f.write("="*80 + "\n") f.write(str(problem_data['test'])) # LLM 생성 솔루션 저장 llm_solution_file = os.path.join(initial_solution_dir, f"{problem_id_safe}_llm_solution.txt") with open(llm_solution_file, 'w', encoding='utf-8') as f: f.write(f"Problem ID: {result['problem_id']}\n") f.write(f"Benchmark: {result['benchmark']}\n") f.write(f"Generated: {timestamp}\n") f.write("="*80 + "\n") f.write("LLM GENERATED SOLUTION:\n") f.write("="*80 + "\n") f.write(llm_step.get('solution', 'No solution generated')) f.write("\n" + "="*80 + "\n") f.write("SYNTAX VALIDATION:\n") f.write("="*80 + "\n") syntax_valid = llm_step.get('syntax_valid', False) f.write(f"Valid: {'✅ YES' if syntax_valid else '❌ NO'}") if llm_step.get('syntax_error'): f.write(f"\nError: {llm_step['syntax_error']}") # 초기 솔루션 정확성 평가 결과 추가 f.write("\n" + "="*80 + "\n") f.write("SOLUTION CORRECTNESS EVALUATION:\n") f.write("="*80 + "\n") solution_eval = llm_step.get('solution_evaluation') if solution_eval: if solution_eval['correct']: f.write(f"Result: ✅ CORRECT ({solution_eval['passed_tests']}/{solution_eval['total_tests']} tests passed)\n") else: f.write(f"Result: ❌ INCORRECT ({solution_eval['passed_tests']}/{solution_eval['total_tests']} tests passed)\n") if solution_eval.get('error'): f.write(f"Error: {solution_eval['error']}\n") # 실행 결과 상세 정보 if solution_eval.get('execution_results'): f.write("\nExecution Details:\n") for i, exec_result in enumerate(solution_eval['execution_results']): f.write(f" Test {i+1}:\n") f.write(f" Status: {exec_result.get('status', 'N/A')}\n") if 'result' in exec_result: f.write(f" Result: {exec_result['result'][:100]}...\n") else: f.write("No evaluation performed (syntax error or no test cases)\n") # IPO 추출을 위해 사용된 프로그램 저장 if 'ipo_extraction' in result['steps']: ipo_step = result['steps']['ipo_extraction'] if 'extracted_program' in ipo_step: extracted_program_file = os.path.join(initial_solution_dir, f"{problem_id_safe}_extracted_program.py") with open(extracted_program_file, 'w', encoding='utf-8') as f: f.write(f"# Problem ID: {result['problem_id']}\n") f.write(f"# Benchmark: {result['benchmark']}\n") f.write(f"# Generated: {timestamp}\n") f.write(f"# Extracted from LLM solution for IPO generation\n\n") f.write(ipo_step['extracted_program']) print(f"📁 초기 솔루션 저장: {initial_solution_dir}/") # 2. IPO 트리플 저장 if 'ipo_extraction' in result['steps']: ipo_step = result['steps']['ipo_extraction'] triples = ipo_step.get('triples', []) ipo_dir = os.path.join(base_dir, 'ipo_triples') os.makedirs(ipo_dir, exist_ok=True) for i, triple in enumerate(triples): triple_file = os.path.join(ipo_dir, f"{problem_id_safe}_triple_{i+1}.json") with open(triple_file, 'w', encoding='utf-8') as f: json.dump(triple, f, indent=2, ensure_ascii=False) print(f"📁 IPO 트리플 저장: {ipo_dir}/ ({len(triples)}개 파일)") # 3. 생성된 태스크 프롬프트 저장 if 'task_generation' in result['steps']: task_step = result['steps']['task_generation'] all_tasks = task_step.get('all_tasks', {}) task_dir = os.path.join(base_dir, 'task_prompts') os.makedirs(task_dir, exist_ok=True) task_count = 0 for task_type, tasks in all_tasks.items(): for i, task in enumerate(tasks): task_file = os.path.join(task_dir, f"{problem_id_safe}_{task_type}_{i+1}.txt") with open(task_file, 'w', encoding='utf-8') as f: f.write(f"Task Type: {task_type}\n") f.write(f"Task ID: {task.get('task_id', 'N/A')}\n") f.write(f"Generated: {timestamp}\n") f.write("="*80 + "\n") f.write("TASK PROMPT:\n") f.write("="*80 + "\n") f.write(task.get('prompt', 'No prompt available')) f.write("\n" + "="*80 + "\n") f.write("EXPECTED SOLUTION:\n") f.write("="*80 + "\n") f.write(task.get('expected_solution', 'No expected solution')) f.write("\n" + "="*80 + "\n") f.write("EVALUATION DATA:\n") f.write("="*80 + "\n") f.write(str(task.get('evaluation_data', 'No evaluation data'))) task_count += 1 print(f"📁 태스크 프롬프트 저장: {task_dir}/ ({task_count}개 파일)") # 4. LLM 태스크 응답 저장 if 'task_evaluation' in result['steps']: eval_step = result['steps']['task_evaluation'] evaluations = eval_step.get('evaluations', {}) response_dir = os.path.join(base_dir, 'llm_responses') os.makedirs(response_dir, exist_ok=True) response_count = 0 for task_type, task_evals in evaluations.items(): for i, evaluation in enumerate(task_evals): response_file = os.path.join(response_dir, f"{problem_id_safe}_{task_type}_{i+1}_response.txt") with open(response_file, 'w', encoding='utf-8') as f: f.write(f"Task Type: {task_type}\n") f.write(f"Task ID: {evaluation.get('task_id', 'N/A')}\n") f.write(f"Generated: {timestamp}\n") f.write("="*80 + "\n") f.write("ORIGINAL PROMPT:\n") f.write("="*80 + "\n") f.write(evaluation.get('prompt', 'No prompt available')) f.write("\n" + "="*80 + "\n") f.write("LLM RESPONSE:\n") f.write("="*80 + "\n") f.write(evaluation.get('llm_response', 'No response')) f.write("\n" + "="*80 + "\n") f.write("EXPECTED SOLUTION:\n") f.write("="*80 + "\n") f.write(evaluation.get('expected_solution', 'No expected solution')) # 추출된 정답 정보 추가 (보상 계산 결과에서 가져오기) if 'reward_computation' in result['steps']: reward_step = result['steps']['reward_computation'] rewards = reward_step.get('rewards', {}) rewards_by_type = rewards.get('rewards_by_type', {}) # 현재 태스크의 보상 정보 찾기 current_task_rewards = rewards_by_type.get(task_type, []) current_reward = None for reward in current_task_rewards: if reward.get('task_id') == evaluation.get('task_id'): current_reward = reward break if current_reward and 'extracted_answer' in current_reward: f.write("\n" + "="*80 + "\n") f.write("EXTRACTED ANSWER:\n") f.write("="*80 + "\n") f.write(current_reward['extracted_answer']) f.write("\n" + "="*80 + "\n") f.write("MATCH RESULT:\n") f.write("="*80 + "\n") match_result = "✅ CORRECT" if current_reward.get('basic_accuracy', 0) > 0 else "❌ INCORRECT" f.write(f"{match_result} (Score: {current_reward.get('basic_accuracy', 0):.3f})") response_count += 1 print(f"📁 LLM 응답 저장: {response_dir}/ ({response_count}개 파일)") # 4-1. 추출된 정답 별도 저장 if 'reward_computation' in result['steps']: reward_step = result['steps']['reward_computation'] rewards = reward_step.get('rewards', {}) rewards_by_type = rewards.get('rewards_by_type', {}) extracted_dir = os.path.join(base_dir, 'extracted_answers') os.makedirs(extracted_dir, exist_ok=True) extracted_count = 0 for task_type, task_rewards in rewards_by_type.items(): for reward in task_rewards: if 'extracted_answer' in reward: task_id = reward.get('task_id', 'unknown') extracted_file = os.path.join(extracted_dir, f"{problem_id_safe}_{task_type}_{task_id}_extracted.txt") with open(extracted_file, 'w', encoding='utf-8') as f: f.write(f"Task Type: {task_type}\n") f.write(f"Task ID: {task_id}\n") f.write(f"Generated: {timestamp}\n") f.write("="*80 + "\n") f.write("EXTRACTED ANSWER:\n") f.write("="*80 + "\n") f.write(reward['extracted_answer']) f.write("\n" + "="*80 + "\n") f.write("EXPECTED SOLUTION:\n") f.write("="*80 + "\n") f.write(reward['expected_solution']) f.write("\n" + "="*80 + "\n") f.write("MATCH RESULT:\n") f.write("="*80 + "\n") match_result = "✅ CORRECT" if reward.get('basic_accuracy', 0) > 0 else "❌ INCORRECT" f.write(f"{match_result} (Score: {reward.get('basic_accuracy', 0):.3f})") extracted_count += 1 print(f"📁 추출된 정답 저장: {extracted_dir}/ ({extracted_count}개 파일)") # 5. 정답 비교 및 보상 결과 저장 if 'reward_computation' in result['steps']: reward_step = result['steps']['reward_computation'] rewards = reward_step.get('rewards', {}) reward_file = os.path.join(base_dir, f"{problem_id_safe}_reward_analysis.json") with open(reward_file, 'w', encoding='utf-8') as f: json.dump(rewards, f, indent=2, ensure_ascii=False) # 사람이 읽기 쉬운 보상 요약 저장 summary_file = os.path.join(base_dir, f"{problem_id_safe}_reward_summary.txt") with open(summary_file, 'w', encoding='utf-8') as f: f.write(f"REWARD ANALYSIS SUMMARY\n") f.write(f"Problem: {result['problem_id']}\n") f.write(f"Benchmark: {result['benchmark']}\n") f.write(f"Generated: {timestamp}\n") f.write("="*80 + "\n") f.write(f"OVERALL STATISTICS:\n") f.write(f"- Total Tasks: {rewards.get('total_tasks', 0)}\n") f.write(f"- Average Reward: {rewards.get('average_reward', 0.0):.3f}\n") f.write("\n") f.write(f"REWARD BY TASK TYPE:\n") for task_type, avg_reward in rewards.get('reward_distribution', {}).items(): f.write(f"- {task_type.title()}: {avg_reward:.3f}\n") f.write("\n") f.write(f"DETAILED TASK REWARDS:\n") for task_type, task_rewards in rewards.get('rewards_by_type', {}).items(): f.write(f"\n{task_type.upper()} TASKS:\n") for reward in task_rewards: f.write(f" Task {reward['task_id']}: ") f.write(f"Accuracy={reward['basic_accuracy']:.3f}, ") f.write(f"Final={reward['final_reward']:.3f}\n") print(f"📁 보상 분석 저장: {reward_file}") print(f"📁 보상 요약 저장: {summary_file}") # 6. 전체 결과 요약 저장 (JSON 직렬화 가능하게 수정) summary_file = os.path.join(base_dir, f"{problem_id_safe}_pipeline_summary.json") # JSON 직렬화 가능하도록 결과 정리 serializable_result = result.copy() # BenchmarkConfig 객체 제거 또는 직렬화 가능한 형태로 변환 if 'steps' in serializable_result and 'problem_loading' in serializable_result['steps']: problem_data = serializable_result['steps']['problem_loading'].get('problem', {}) if 'benchmark_config' in problem_data: # BenchmarkConfig 객체를 딕셔너리로 변환 config_obj = problem_data['benchmark_config'] problem_data['benchmark_config'] = { 'name': config_obj.name, 'data_path': config_obj.data_path, 'problem_prefix': config_obj.problem_prefix, 'max_problems': config_obj.max_problems, 'test_timeout': config_obj.test_timeout } with open(summary_file, 'w', encoding='utf-8') as f: json.dump(serializable_result, f, indent=2, ensure_ascii=False) print(f"📁 전체 결과 요약 저장: {summary_file}") print(f"\n📂 모든 결과 파일 저장 완료: {output_dir}") def main(): parser = argparse.ArgumentParser(description='Test Complete TestTime RLVR Pipeline') parser.add_argument('--model', type=str, default='Qwen/Qwen2.5-7B', help='Model name to test with') parser.add_argument('--gpu', type=int, default=0, help='GPU ID to use') parser.add_argument('--max_tokens', type=int, default=512, help='Max tokens for generation') parser.add_argument('--benchmark', type=str, default='test', choices=['test', 'humaneval', 'mbpp'], help='Benchmark to use (test=example data, humaneval=HumanEval+, mbpp=MBPP+)') parser.add_argument('--problem_id', type=str, default='test/simple_sum', help='Problem ID to test (e.g., HumanEval/0, Mbpp/2)') parser.add_argument('--output_dir', type=str, default='../tmp', help='Output directory for detailed results') parser.add_argument('--verbose', action='store_true', help='Verbose logging') args = parser.parse_args() # GPU 설정 device = f'cuda:{args.gpu}' if torch.cuda.is_available() else 'cpu' print(f"🎯 Using device: {device}") # TestTime 설정 config = TestTimeConfig( model_name=args.model, max_adaptation_steps=3, learning_rate=1e-5, task_distribution={'induction': 0.4, 'deduction': 0.3, 'abduction': 0.3}, adaptation_batch_size=1, max_tasks_per_type=3, use_flash_attention=False, # 작은 모델에서는 비활성화 torch_dtype=torch.float16, enable_gradient_checkpointing=False ) # 벤치마크 설정 (절대 경로로 계산) base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if args.benchmark == 'humaneval': benchmark_config = BenchmarkConfig.get_humaneval_config() benchmark_config.data_path = os.path.join(base_dir, 'evaluation/code_eval/data/HumanEvalPlus.jsonl') elif args.benchmark == 'mbpp': benchmark_config = BenchmarkConfig.get_mbpp_config() benchmark_config.data_path = os.path.join(base_dir, 'evaluation/code_eval/data/MbppPlus.jsonl') else: # test benchmark_config = BenchmarkConfig( name='test_humaneval', data_path='test_data', problem_prefix='TestEval', max_problems=1, test_timeout=30 ) # 로거 설정 logger = TestTimeLogger(log_level='DEBUG' if args.verbose else 'INFO') logger.log_info("🚀 Starting Complete TestTime RLVR Pipeline Test") logger.log_info(f"📋 Model: {args.model}") logger.log_info(f"🎯 Device: {device}") try: # 모델 및 토크나이저 로드 (VLLM 최적화 적용) logger.log_info("📦 Loading model and tokenizer with VLLM optimization...") model, tokenizer = InitialSolutionGenerator.load_model_with_optimizations( args.model, device, config, use_vllm=True # VLLM 최적화 활성화 ) logger.log_info("✅ Model loaded successfully") # 파이프라인 초기화 logger.log_info("🔧 Initializing pipeline...") pipeline = CompleteTestTimePipeline(model, tokenizer, config, logger) logger.log_info("✅ Pipeline initialized") # 문제 ID 설정 problem_id = args.problem_id logger.log_info(f"📄 Testing with {args.benchmark} benchmark") logger.log_info(f"🔍 Problem ID: {problem_id}") # 테스트 모드인 경우 예시 데이터 사용 if args.benchmark == 'test': test_problem = load_test_problem() logger.log_info(f"🔍 Problem preview: {test_problem['prompt'][:100]}...") # 임시로 문제를 pipeline의 benchmark_loader에 직접 제공 pipeline.benchmark_loader.load_problem = lambda cfg, pid: test_problem else: # 실제 벤치마크 사용 시 프롬프트 미리보기 temp_problem = pipeline.benchmark_loader.load_problem(benchmark_config, problem_id) # AZR 코드 평가 프롬프트 포맷 적용 azr_prompt = f"Please provide a self-contained Python script that solves the following problem in a markdown code block:\n\n{temp_problem.get('prompt', 'No prompt available')}" print(f"\n📋 **ORIGINAL PROBLEM:**") print("="*80) print(temp_problem.get('prompt', 'No prompt available')) print("="*80) print(f"\n📋 **AZR CODE EVALUATION PROMPT (실제 사용되는 프롬프트):**") print("="*80) print(azr_prompt) print("="*80) print(f"📌 Entry Point: {temp_problem.get('entry_point', 'N/A')}") print(f"📌 Task ID: {temp_problem.get('task_id', 'N/A')}") if 'test' in temp_problem: print(f"📌 Test Preview: {str(temp_problem['test'])[:200]}...") print("="*80) # 전체 파이프라인 실행 logger.log_info("🏃‍♂️ Running complete pipeline...") print("\n" + "="*60) print("🚀 COMPLETE TESTTIME RLVR PIPELINE EXECUTION") print(f"📋 Benchmark: {args.benchmark}") print(f"🔍 Problem: {problem_id}") print("="*60) result = pipeline.run_complete_pipeline(benchmark_config, problem_id) print("\n" + "="*60) print("📊 PIPELINE EXECUTION RESULTS") print("="*60) # 결과 출력 print(f"✅ Success: {result['success']}") if result['error']: print(f"❌ Error: {result['error']}") print(f"📋 Problem: {result['problem_id']}") print(f"🏷️ Benchmark: {result['benchmark']}") # 단계별 결과 출력 for step_name, step_result in result['steps'].items(): print(f"\n📍 Step: {step_name.replace('_', ' ').title()}") print(f" Success: {'✅' if step_result['success'] else '❌'}") if step_name == 'llm_generation': solution = step_result.get('solution', '') print(f" Solution preview: {solution[:100]}...") print(f" Syntax valid: {'✅' if step_result.get('syntax_valid') else '❌'}") # 초기 솔루션 정확성 평가 결과 표시 eval_result = step_result.get('solution_evaluation') if eval_result: if eval_result['correct']: print(f" ✅ Solution CORRECT ({eval_result['passed_tests']}/{eval_result['total_tests']} tests passed)") else: print(f" ❌ Solution INCORRECT ({eval_result['passed_tests']}/{eval_result['total_tests']} tests passed)") if eval_result.get('error'): print(f" Error: {eval_result['error'][:80]}...") elif step_name == 'ipo_extraction': print(f" IPO triples extracted: {step_result.get('num_triples', 0)}") elif step_name == 'task_generation': print(f" Total tasks generated: {step_result.get('total_tasks', 0)}") for task_type, count in step_result.get('tasks_by_type', {}).items(): print(f" {task_type}: {count}") elif step_name == 'task_evaluation': evaluations = step_result.get('evaluations', {}) total_evaluated = sum(len(evals) for evals in evaluations.values()) print(f" Tasks evaluated: {total_evaluated}") elif step_name == 'reward_computation': rewards = step_result.get('rewards', {}) print(f" Average reward: {rewards.get('average_reward', 0.0):.3f}") print(f" Total tasks scored: {rewards.get('total_tasks', 0)}") # 정답 추출 상세 정보 표시 for task_type, type_rewards in rewards.get('rewards_by_type', {}).items(): print(f" {task_type.title()} Tasks:") for reward in type_rewards[:2]: # 처음 2개만 표시 print(f" Task {reward['task_id']}: Expected='{reward['expected_solution'][:50]}...' | Extracted='{reward['extracted_answer'][:50]}...' | Match={'✅' if reward['basic_accuracy'] > 0 else '❌'}") # 상세 결과 표시 (verbose 모드) if args.verbose and result['success']: print("\n" + "="*60) print("🔍 DETAILED RESULTS (VERBOSE MODE)") print("="*60) # IPO 추출 상세 if 'ipo_extraction' in result['steps']: ipo_step = result['steps']['ipo_extraction'] triples = ipo_step.get('triples', []) print(f"\n📊 IPO Triples ({len(triples)}):") for i, triple in enumerate(triples[:3]): # 처음 3개만 표시 print(f" [{i+1}] Input: {str(triple.get('input', 'N/A'))[:50]}...") print(f" Output: {str(triple.get('output', 'N/A'))[:50]}...") # 태스크 생성 상세 if 'task_generation' in result['steps']: task_step = result['steps']['task_generation'] all_tasks = task_step.get('all_tasks', {}) print(f"\n🎯 Generated Tasks:") for task_type, tasks in all_tasks.items(): print(f" {task_type.title()} Tasks ({len(tasks)}):") for i, task in enumerate(tasks[:2]): # 처음 2개만 표시 prompt = task.get('prompt', '') print(f" [{i+1}] {prompt[:80]}...") # 보상 분포 상세 if 'reward_computation' in result['steps']: reward_step = result['steps']['reward_computation'] rewards = reward_step.get('rewards', {}) distribution = rewards.get('reward_distribution', {}) print(f"\n🏆 Reward Distribution:") for task_type, avg_reward in distribution.items(): print(f" {task_type.title()}: {avg_reward:.3f}") print("\n" + "="*60) print("🎉 PIPELINE TEST COMPLETED SUCCESSFULLY") print("="*60) # 상세 결과 파일 저장 if result['success']: print(f"\n📁 상세 결과 파일 저장 중...") save_detailed_results(result, args, args.output_dir) return result['success'] except Exception as e: logger.log_error(f"💥 Pipeline test failed: {e}") import traceback traceback.print_exc() return False finally: # GPU 메모리 정리 if torch.cuda.is_available(): torch.cuda.empty_cache() logger.log_info("🧹 Cleaned up resources") if __name__ == '__main__': success = main() exit_code = 0 if success else 1 print(f"\n🚪 Exiting with code {exit_code}") sys.exit(exit_code)