#!/usr/bin/env python3 """ TTRLVR + AZR 통합 학습 메인 스크립트 30라운드 반복 학습을 통해 TTRLVR 파이프라인과 AZR 학습을 통합: 1. 각 라운드마다 현재 모델로 (i,p,o) → induction/deduction/abduction tasks 생성 2. 해당 라운드 데이터로만 AZR 학습 (각 라운드의 task만 사용) 3. 개선된 모델로 다음 라운드 진행 4. 5라운드마다 체크포인트 저장 사용 예시: # 일반 학습 python train_ttrlvr_azr.py --benchmark mbpp --problems 10 --rounds 30 python train_ttrlvr_azr.py --benchmark humaneval --problems 5 --rounds 10 --resume 15 python train_ttrlvr_azr.py --benchmark mbpp --problem-id Mbpp/2 --rounds 5 --num-programs 8 --eval-rounds 10 python train_ttrlvr_azr.py --benchmark mbpp --problem-id Mbpp/2 --rounds 5 --skip-task-eval # Step 5 전용 모드 (기존 데이터로 VeRL 학습만) python train_ttrlvr_azr.py --step5-only --data-path /path/to/azr_training_data --gpu 1,2,3,0 --config configs/ttrlvr_azr_ppo_4gpu.yaml """ import os import sys import argparse import json from datetime import datetime from pathlib import Path from typing import List import warnings import signal import atexit # Gradient checkpointing 관련 경고 필터링 warnings.filterwarnings("ignore", message=".*Caching is incompatible with gradient checkpointing.*") # 경로 설정 sys.path.append('/home/ubuntu/RLVR/TestTime-RLVR-v2') # EvalPlus 경로 추가 (기존 TTRLVR 방식) sys.path.append('/home/ubuntu/RLVR/TestTime-RLVR-v2/evaluation/code_eval/coding') # TTRLVR 모듈 임포트 from absolute_zero_reasoner.testtime.config import TestTimeConfig, BenchmarkConfig from absolute_zero_reasoner.testtime.logger import TestTimeLogger from utils.iterative_trainer import IterativeTrainer # Ray 정리 변수 _trainer_instance = None _logger_instance = None def cleanup_ray(): """Ray 클러스터 정리 함수""" global _trainer_instance, _logger_instance try: if _logger_instance: _logger_instance.log_info("🔄 강제 종료 감지: Ray 클러스터 정리 중...") except: print("🔄 강제 종료 감지: Ray 클러스터 정리 중...") try: # IterativeTrainer 정리 if _trainer_instance: _trainer_instance.cleanup_ray() except Exception as e: try: if _logger_instance: _logger_instance.log_error(f"IterativeTrainer 정리 실패: {e}") except: print(f"IterativeTrainer 정리 실패: {e}") try: # 현재 프로그램의 Ray만 종료 (안전한 방법) import ray if ray.is_initialized(): ray.shutdown() except Exception as e: try: if _logger_instance: _logger_instance.log_error(f"Ray 종료 실패: {e}") except: print(f"Ray 종료 실패: {e}") try: if _logger_instance: _logger_instance.log_info("✅ Ray 정리 완료") except: print("✅ Ray 정리 완료") def signal_handler(signum, frame): """시그널 핸들러 (Ctrl+C, 강제 종료 등)""" try: if _logger_instance: _logger_instance.log_info(f"🛑 시그널 {signum} 수신: 프로그램 종료 중...") except: print(f"🛑 시그널 {signum} 수신: 프로그램 종료 중...") cleanup_ray() sys.exit(1) def parse_arguments(): """명령행 인자 파싱""" parser = argparse.ArgumentParser( description='TTRLVR + AZR 통합 반복 학습', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 예시: # MBPP 10문제로 30라운드 학습 python train_ttrlvr_azr.py --benchmark mbpp --problems 10 --rounds 30 # HumanEval 5문제로 10라운드 학습 python train_ttrlvr_azr.py --benchmark humaneval --problems 5 --rounds 10 # 15라운드부터 재개 python train_ttrlvr_azr.py --benchmark mbpp --problems 10 --rounds 30 --resume 15 # 특정 GPU 사용 python train_ttrlvr_azr.py --benchmark mbpp --problems 10 --rounds 30 --gpu 4 """ ) parser.add_argument( '--benchmark', choices=['mbpp', 'humaneval'], default='mbpp', help='벤치마크 선택 (기본값: mbpp)' ) parser.add_argument( '--problems', type=int, default=10, help='문제 수 (기본값: 10)' ) parser.add_argument( '--problem-id', type=str, help='특정 문제 ID (예: HumanEval/1, Mbpp/10)' ) parser.add_argument( '--rounds', type=int, default=30, help='총 라운드 수 (기본값: 30)' ) parser.add_argument( '--resume', type=int, default=1, help='재개할 라운드 번호 (기본값: 1)' ) parser.add_argument( '--gpu', type=str, default='5', help='사용할 GPU 번호 (단일: 5, 다중: 1,2,3,5)' ) parser.add_argument( '--output-dir', type=str, default='./results/ttrlvr_azr', help='결과 저장 디렉토리 (기본값: ./results/ttrlvr_azr)' ) parser.add_argument( '--config', type=str, help='설정 파일 경로 (선택사항)' ) parser.add_argument( '--model', type=str, default='Qwen/Qwen2.5-7B', help='사용할 모델 (기본값: Qwen/Qwen2.5-7B)' ) parser.add_argument( '--debug', action='store_true', help='디버그 모드 활성화' ) parser.add_argument( '--batch-size', type=int, default=24, help='학습 배치 크기 (기본값: 24, OOM 시 줄이기)' ) parser.add_argument( '--batch-epochs', type=int, default=1, help='배치당 에폭 수 (기본값: 1, 더 많은 학습을 위해 증가 가능)' ) parser.add_argument( '--num-programs', type=int, default=4, help='생성할 다양한 프로그램 수 (기본값: 4, 더 다양한 데이터를 위해 증가 가능)' ) parser.add_argument( '--input-generation-rounds', type=int, default=3, help='다양한 입력 생성 라운드 수 (기본값: 3, 라운드당 5개씩 생성)' ) parser.add_argument( '--parallel-batch-size', type=int, default=4, help='동시 처리할 프롬프트 수 (기본값: 4, GPU 메모리에 따라 조정)' ) parser.add_argument( '--eval-rounds', type=int, default=5, help='매 라운드 정확도 측정 횟수 (기본값: 5, 더 정확한 평가를 위해 증가 가능)' ) parser.add_argument( '--skip-task-eval', action='store_true', help='Task evaluation(4단계) 스킵하여 빠른 테스트 (데이터 생성 후 바로 VeRL 학습)' ) parser.add_argument( '--save-every-round', action='store_true', help='매 라운드마다 체크포인트 저장 (기본값: False)' ) parser.add_argument( '--save-round-interval', type=int, default=5, help='체크포인트 저장 간격 (예: 5 = 5라운드마다 저장, 기본값: 5)' ) parser.add_argument( '--step5-only', action='store_true', help='기존 데이터로 Step 5 (VeRL 학습)만 실행' ) parser.add_argument( '--data-path', type=str, help='Step5 전용 모드에서 사용할 기존 azr_training_data 디렉토리 경로' ) return parser.parse_args() def setup_environment(gpu_id: str, batch_size: int = None): """환경 변수 설정 - run_ttrlvr_azr_training.sh와 동일하게""" # GPU 설정 - 명령행 인자를 우선 사용하고, 없으면 기존 환경변수 사용 if gpu_id: os.environ['CUDA_VISIBLE_DEVICES'] = gpu_id print(f"🎯 Using command line GPU setting: {gpu_id}") elif 'CUDA_VISIBLE_DEVICES' in os.environ and os.environ['CUDA_VISIBLE_DEVICES']: print(f"🎯 Using existing CUDA_VISIBLE_DEVICES: {os.environ['CUDA_VISIBLE_DEVICES']}") else: os.environ['CUDA_VISIBLE_DEVICES'] = '5' # 기본값 print(f"🎯 Using default GPU: 5") # VLLM 설정 (run_ttrlvr_azr_training.sh와 동일) os.environ['VLLM_ATTENTION_BACKEND'] = 'FLASH_ATTN' # Ray 설정 (run_ttrlvr_azr_training.sh와 동일) os.environ['RAY_memory_monitor_refresh_ms'] = '0' os.environ['RAY_LOGGING_LEVEL'] = 'DEBUG' # Hydra 설정 os.environ['HYDRA_FULL_ERROR'] = '1' # Python 경로 설정 (verl 경로 추가) pythonpath = os.environ.get('PYTHONPATH', '') if '/home/ubuntu/RLVR/verl' not in pythonpath: os.environ['PYTHONPATH'] = f"{pythonpath}:/home/ubuntu/RLVR/verl:/home/ubuntu/RLVR/TestTime-RLVR-v2" # batch size 설정 if batch_size is not None: os.environ['TRAIN_BATCH_SIZE'] = str(batch_size) os.environ['VLLM_ATTENTION_BACKEND'] = 'FLASH_ATTN' os.environ['RAY_memory_monitor_refresh_ms'] = '0' os.environ['RAY_LOGGING_LEVEL'] = 'DEBUG' os.environ['HYDRA_FULL_ERROR'] = '1' os.environ['HF_HOME'] = '/data/.cache/huggingface' os.environ['TRANSFORMERS_CACHE'] = '/data/.cache/huggingface' os.environ['TOKENIZERS_PARALLELISM'] = 'false' # PYTHONPATH 설정 current_pythonpath = os.environ.get('PYTHONPATH', '') new_paths = [ '/home/ubuntu/RLVR/TestTime-RLVR-v2', '/data/miniforge3/envs/azr/lib/python3.10/site-packages' ] for path in new_paths: if path not in current_pythonpath: current_pythonpath = f"{path}:{current_pythonpath}" if current_pythonpath else path os.environ['PYTHONPATH'] = current_pythonpath def load_benchmark_problems(benchmark_config: BenchmarkConfig) -> List[str]: """벤치마크에서 문제 ID 목록 로드 (기존 TTRLVR 방식 사용)""" problems = [] if benchmark_config.name == 'mbpp': # MBPP+ EvalPlus 표준 데이터 로딩 try: from evalplus.data.mbpp import get_mbpp_plus mbpp_problems = get_mbpp_plus() # 자동으로 mbpp_deserialize_inputs 적용됨 problems = list(mbpp_problems.keys()) print(f"✅ MBPP+ 데이터 로드 성공: {len(problems)}개 문제 (EvalPlus 표준 방식)") except Exception as e: print(f"❌ MBPP+ EvalPlus 로딩 실패, 기존 방식 사용: {e}") # Fallback to original method data_path = benchmark_config.data_path if os.path.exists(data_path): with open(data_path, 'r') as f: for line in f: problem = json.loads(line.strip()) problems.append(problem['task_id']) elif benchmark_config.name == 'humaneval': # HumanEval+ EvalPlus 표준 데이터 로딩 try: from evalplus.data.humaneval import get_human_eval_plus humaneval_problems = get_human_eval_plus() problems = list(humaneval_problems.keys()) print(f"✅ HumanEval+ 데이터 로드 성공: {len(problems)}개 문제 (EvalPlus 표준 방식)") except Exception as e: print(f"❌ HumanEval+ EvalPlus 로딩 실패, 기존 방식 사용: {e}") # Fallback to original method data_path = benchmark_config.data_path if os.path.exists(data_path): with open(data_path, 'r') as f: for line in f: problem = json.loads(line.strip()) problems.append(problem['task_id']) return problems def create_problem_list(benchmark: str, num_problems: int, specific_problem_id: str = None) -> list: """벤치마크별 문제 ID 리스트 생성 (기존 TTRLVR 방식 사용)""" # BenchmarkConfig 생성 benchmark_config = create_benchmark_config(benchmark) # 전체 문제 목록 로드 all_problems = load_benchmark_problems(benchmark_config) if not all_problems: raise ValueError(f"No problems found for benchmark: {benchmark}") # 특정 문제 ID가 지정된 경우 if specific_problem_id: if specific_problem_id in all_problems: return [specific_problem_id] else: raise ValueError(f"Problem ID '{specific_problem_id}' not found in {benchmark} benchmark") # 요청된 수만큼 문제 선택 if num_problems <= 0 or num_problems > len(all_problems): return all_problems else: return all_problems[:num_problems] def create_config(args) -> TestTimeConfig: """TestTimeConfig 생성""" config = TestTimeConfig() # 기본 설정 config.model_name = args.model # 인자로 받은 모델 사용 config.max_new_tokens = 512 config.temperature = 0.05 config.baseline_evaluation_rounds = args.eval_rounds # 평가 횟수 # 프로그램 생성 설정 config.num_program_variations = args.num_programs # 다양한 프로그램 개수 config.input_generation_rounds = args.input_generation_rounds # 입력 생성 라운드 수 config.parallel_batch_size = args.parallel_batch_size # 동시 처리 프롬프트 수 # Task evaluation 스킵 설정 config.skip_task_evaluation = args.skip_task_eval # Task evaluation 스킵 여부 # 디버그 모드 if args.debug: config.debug = True config.verbose = True return config def create_benchmark_config(benchmark: str) -> BenchmarkConfig: """BenchmarkConfig 생성 (기존 TTRLVR 방식 사용)""" # 기존 TTRLVR 시스템과 동일한 방식으로 BenchmarkConfig 생성 base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if benchmark == 'mbpp': benchmark_config = BenchmarkConfig.get_mbpp_config() benchmark_config.data_path = os.path.join(base_dir, 'evaluation/code_eval/data/MbppPlus.jsonl') return benchmark_config elif benchmark == 'humaneval': benchmark_config = BenchmarkConfig.get_humaneval_config() benchmark_config.data_path = os.path.join(base_dir, 'evaluation/code_eval/data/HumanEvalPlus.jsonl') return benchmark_config else: raise ValueError(f"Unknown benchmark: {benchmark}") def save_run_config(args, problem_ids: list, output_dir: str): """실행 설정 저장""" config_data = { 'timestamp': datetime.now().isoformat(), 'benchmark': args.benchmark, 'num_problems': args.problems, 'total_rounds': args.rounds, 'resume_from': args.resume, 'gpu': args.gpu, 'problem_ids': problem_ids, 'output_dir': output_dir, 'command_line': ' '.join(sys.argv) } config_file = os.path.join(output_dir, 'run_config.json') with open(config_file, 'w') as f: json.dump(config_data, f, indent=2) return config_file def run_step5_only_mode(args): """Step 5 전용 모드 실행""" from pathlib import Path print(f"🎓 Running Step 5 (VeRL training) only mode") print(f"📂 Data path: {args.data_path}") # 데이터 경로 검증 data_path = Path(args.data_path) if not data_path.exists(): print(f"❌ Error: Data path does not exist: {data_path}") return 1 # 필수 파일들 확인 required_files = ['induction.parquet', 'deduction.parquet', 'abduction.parquet'] missing_files = [] for file_name in required_files: if not (data_path / file_name).exists(): missing_files.append(file_name) if missing_files: print(f"❌ Error: Missing required files: {missing_files}") return 1 print(f"✅ Found all required training data files in: {data_path}") # 파일 크기 정보 출력 for file_name in required_files: file_path = data_path / file_name file_size = file_path.stat().st_size print(f" 📄 {file_name}: {file_size:,} bytes") # 환경 설정 setup_environment(args.gpu, args.batch_size) # 설정 파일 경로 결정 config_path = args.config if not config_path: # GPU 개수에 따라 기본 설정 파일 선택 gpu_count = len(args.gpu.split(',')) if args.gpu else 1 if gpu_count >= 4: config_path = '/home/ubuntu/RLVR/TestTime-RLVR-v2/test/configs/ttrlvr_azr_ppo_4gpu.yaml' else: config_path = '/home/ubuntu/RLVR/TestTime-RLVR-v2/test/configs/ttrlvr_azr_ppo_1gpu.yaml' print(f"🚀 Initializing trainer with config: {config_path}") # TestTimeConfig 생성 (기존 create_config 함수 사용) config = create_config(args) # 로거 초기화 logger = TestTimeLogger() # IterativeTrainer 초기화 global _trainer_instance _trainer_instance = IterativeTrainer( config=config, logger=logger, verl_config_path=config_path ) # Step 5 전용 VeRL 학습 실행 try: result = _trainer_instance.run_verl_training_only( training_data_path=str(data_path), round_num=args.resume, # resume을 round number로 사용 experiment_name=f"step5_only_{args.benchmark}" ) if result.get('success', False): print(f"✅ VeRL training completed successfully!") print(f"⏱️ Duration: {result.get('duration', 'N/A')} seconds") if 'model_path' in result: print(f"🤖 Updated model: {result['model_path']}") return 0 else: print(f"❌ VeRL training failed: {result.get('error', 'Unknown error')}") return 1 except Exception as e: print(f"💥 Training failed with exception: {e}") import traceback traceback.print_exc() return 1 def main(): """메인 실행 함수""" global _trainer_instance, _logger_instance # 시그널 핸들러 등록 signal.signal(signal.SIGINT, signal_handler) # Ctrl+C signal.signal(signal.SIGTERM, signal_handler) # terminate atexit.register(cleanup_ray) # 정상 종료 시에도 정리 # 인자 파싱 args = parse_arguments() # Step 5 전용 모드 처리 if args.step5_only: if not args.data_path: print("❌ Error: --data-path is required when using --step5-only") return 1 return run_step5_only_mode(args) # 임시 문제 리스트 생성 (디렉토리명용) temp_problem_ids = create_problem_list(args.benchmark, args.problems, args.problem_id) actual_problem_count = len(temp_problem_ids) if temp_problem_ids else args.problems # 출력 디렉토리 생성 timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') if args.problem_id: problem_desc = f"problem_{args.problem_id.replace('/', '_')}" else: problem_desc = f"{actual_problem_count}problems" output_dir = os.path.join( args.output_dir, f'{args.benchmark}_{problem_desc}_{args.rounds}rounds_{timestamp}' ) os.makedirs(output_dir, exist_ok=True) # 환경 설정 setup_environment(args.gpu, args.batch_size) # 로거 초기화 (로그 디렉토리 지정) log_dir = "/home/ubuntu/RLVR/TestTime-RLVR-v2/logs" os.makedirs(log_dir, exist_ok=True) logger = TestTimeLogger(log_dir=log_dir) _logger_instance = logger # 글로벌 변수에 할당 # 실제 로그 파일명 확인 (Logger가 생성한 파일명) log_handlers = [h for h in logger.logger.handlers if hasattr(h, 'baseFilename')] if log_handlers: actual_log_file = log_handlers[0].baseFilename print(f"📝 로그 파일: {actual_log_file}") else: print(f"📝 로그 디렉토리: {log_dir}") # 설정 생성 config = create_config(args) benchmark_config = create_benchmark_config(args.benchmark) # 문제 리스트 생성 problem_ids = create_problem_list(args.benchmark, args.problems, args.problem_id) # 실제 사용되는 값들로 로깅 actual_problem_count = len(problem_ids) if problem_ids else args.problems actual_gpu = os.environ.get('CUDA_VISIBLE_DEVICES', args.gpu) logger.log_info("🚀 TTRLVR + AZR 통합 학습 시작") logger.log_info("=" * 80) logger.log_info(f"📊 설정:") logger.log_info(f" - 벤치마크: {args.benchmark}") if args.problem_id: logger.log_info(f" - 특정 문제 ID: {args.problem_id}") logger.log_info(f" - 문제 수: {actual_problem_count} (특정 문제)") else: logger.log_info(f" - 문제 수: {actual_problem_count}") logger.log_info(f" - 총 라운드: {args.rounds}") logger.log_info(f" - GPU: {actual_gpu}") logger.log_info(f" - 출력 디렉토리: {output_dir}") if args.resume > 1: logger.log_info(f" - 재개 라운드: {args.resume}") logger.log_info("=" * 80) logger.log_info(f"🎯 문제 리스트: {problem_ids}") try: # 실행 설정 저장 config_file = save_run_config(args, problem_ids, output_dir) logger.log_info(f"📄 실행 설정 저장: {config_file}") # IterativeTrainer 초기화 (VeRL config 파일 경로 전달) verl_config_path = None if args.config: verl_config_path = os.path.abspath(args.config) trainer = IterativeTrainer( config, logger, batch_epochs=args.batch_epochs, verl_config_path=verl_config_path, save_every_round=args.save_every_round, save_round_interval=args.save_round_interval ) _trainer_instance = trainer # 글로벌 변수에 할당 # 반복 학습 실행 logger.log_info("🎓 반복 학습 시작") training_results = trainer.run_iterative_training( benchmark_config=benchmark_config, problem_ids=problem_ids, total_rounds=args.rounds, resume_from_round=args.resume ) # 최종 결과 저장 (JSON 직렬화 가능한 형태로 변환) results_file = os.path.join(output_dir, 'training_results.json') try: # BenchmarkConfig 객체를 dict로 변환 serializable_results = json.loads(json.dumps(training_results, default=str)) with open(results_file, 'w') as f: json.dump(serializable_results, f, indent=2) except Exception as e: logger.log_warning(f"결과 저장 중 오류 (무시됨): {e}") # 기본 정보만 저장 basic_results = { 'success': training_results.get('success', False), 'benchmark': args.benchmark, 'total_rounds': args.rounds, 'completed_rounds': len(training_results.get('rounds', {})), 'timestamp': training_results.get('end_time', 'unknown') } with open(results_file, 'w') as f: json.dump(basic_results, f, indent=2) logger.log_info(f"💾 최종 결과 저장: {results_file}") # Ray 클러스터 정리 trainer.cleanup() if training_results['success']: logger.log_info("🎉 TTRLVR + AZR 통합 학습 성공적으로 완료!") return 0 else: logger.log_error(f"❌ 학습 실패: {training_results.get('error', 'Unknown error')}") return 1 except KeyboardInterrupt: logger.log_info("⚠️ 사용자에 의해 중단됨") # 중단 시에도 cleanup if 'trainer' in locals(): trainer.cleanup() return 130 except Exception as e: logger.log_error(f"💥 예상치 못한 오류: {e}") # 오류 시에도 cleanup if 'trainer' in locals(): trainer.cleanup() return 1 finally: # finally 블록에서도 cleanup 보장 if 'trainer' in locals() and hasattr(trainer, 'cleanup'): trainer.cleanup() # 추가 Ray 프로세스 정리 import subprocess try: # Ray stop 명령어 실행 subprocess.run(['ray', 'stop', '--force'], capture_output=True, timeout=10) except: pass if __name__ == '__main__': exit_code = main() sys.exit(exit_code)