#!/usr/bin/env python3 """ TTRLVR + AZR 반복 학습 트레이너 30라운드 반복 학습을 관리하며, 각 라운드마다: 1. TTRLVR 파이프라인으로 (i,p,o) → tasks 생성 2. 해당 라운드 데이터로 실제 AZR의 CodeIORayPPOTrainer 학습 3. 개선된 모델로 다음 라운드 진행 """ import os import sys import json import pandas as pd import ray from pathlib import Path from datetime import datetime from typing import Dict, List, Any, Optional # TTRLVR 모듈 임포트 sys.path.append('/home/ubuntu/RLVR/TestTime-RLVR-v2') from absolute_zero_reasoner.testtime.complete_pipeline import CompleteTestTimePipeline from absolute_zero_reasoner.testtime.config import TestTimeConfig, BenchmarkConfig from absolute_zero_reasoner.testtime.logger import TestTimeLogger # VeRL 기반 AZR 실행을 위한 임포트 from utils.checkpoint_manager import CheckpointManager from absolute_zero_reasoner.trainer.ppo.azr_ray_trainer import CodeIORayPPOTrainer from utils.custom_ray_trainer import CustomCodeIORayPPOTrainer import hydra from hydra.core.global_hydra import GlobalHydra class IterativeTrainer: """TTRLVR + AZR 반복 학습 관리자""" def __init__(self, config: TestTimeConfig, logger: Optional[TestTimeLogger] = None, batch_epochs: int = 1, verl_config_path: str = None, save_every_round: bool = False, save_round_interval: int = 5): self.config = config self.logger = logger or TestTimeLogger() self.batch_epochs = batch_epochs # 배치당 에폭 수 저장 self.verl_config_path = verl_config_path # VeRL config 파일 경로 self.save_every_round = save_every_round # 매 라운드 저장 여부 self.save_round_interval = save_round_interval # 저장 간격 # GPU 개수 감지 및 실행 모드 결정 self.available_gpus = self._detect_available_gpus() self.execution_mode = self._determine_execution_mode() self.logger.log_info(f"🎯 Detected {len(self.available_gpus)} GPUs: {self.available_gpus}") self.logger.log_info(f"🎯 Execution mode: {self.execution_mode}") # 완전한 파이프라인 인스턴스 (lazy initialization) self.complete_pipeline = None # 체크포인트 매니저 초기화 self.checkpoint_manager = CheckpointManager(logger=self.logger) # 학습 상태 추적 self.original_model_name = config.model_name # 원본 모델 이름 저장 (tokenizer 로드용) self.current_model_path = config.model_name # config에서 모델 이름 가져오기 self.current_model = None # 현재 모델 인스턴스 저장 (VeRL과 공유용) self.round_results = {} self.checkpoint_dir = "/data/RLVR/checkpoints/ttrlvr_azr" # Ray Actor로 파이프라인 관리 (VeRL 패턴) self.remote_pipeline = None # VeRL trainer 인스턴스 (한 번만 초기화, 메모리에서 계속 사용) self.verl_trainer = None self.verl_config = None self.ray_initialized = False # 학습 실행 시간 기록 self.start_time = None self.round_times = {} def cleanup(self): """Ray 클러스터 및 관련 리소스 정리""" try: self.logger.log_info("🧹 Starting cleanup process...") # VeRL trainer 정리 if hasattr(self, 'verl_trainer') and self.verl_trainer is not None: try: self.logger.log_info(" - Cleaning up VeRL trainer...") # VeRL trainer의 Ray actors 정리 if hasattr(self.verl_trainer, 'shutdown'): self.verl_trainer.shutdown() self.verl_trainer = None except Exception as e: self.logger.log_warning(f" - VeRL trainer cleanup warning: {e}") # Remote pipeline actor 종료 if self.remote_pipeline is not None: try: self.logger.log_info(" - Killing remote pipeline actor...") ray.kill(self.remote_pipeline) except: pass self.remote_pipeline = None # Ray 클러스터 종료 if self.ray_initialized and ray.is_initialized(): self.logger.log_info(" - Shutting down Ray cluster...") # 모든 Ray actors 강제 종료 try: # 현재 실행 중인 모든 actors 가져오기 actors = ray.util.list_named_actors() if actors: self.logger.log_info(f" - Found {len(actors)} named actors to kill") for actor in actors: try: ray.kill(ray.get_actor(actor['name'])) except: pass except: pass # Ray shutdown with force ray.shutdown() self.ray_initialized = False # Ray 프로세스가 완전히 종료될 때까지 잠시 대기 import time time.sleep(2) self.logger.log_info("✅ Ray cluster shutdown complete") # GPU 메모리 정리 try: import torch if torch.cuda.is_available(): torch.cuda.empty_cache() self.logger.log_info(" - GPU memory cleared") except: pass except Exception as e: self.logger.log_error(f"Error during cleanup: {e}") # 그래도 Ray는 강제 종료 시도 try: ray.shutdown() except: pass def run_iterative_training(self, benchmark_config: BenchmarkConfig, problem_ids: List[str], total_rounds: int = 30, resume_from_round: int = 1) -> Dict[str, Any]: """30라운드 반복 학습 메인 루프""" self.start_time = datetime.now() # 세션 전체에서 사용할 timestamp 생성 (한 번만) self.session_timestamp = self.start_time.strftime('%Y%m%d_%H%M%S') self.logger.log_info(f"🚀 Starting TTRLVR + AZR iterative training") self.logger.log_info(f"📊 Configuration: {len(problem_ids)} problems, {total_rounds} rounds") self.logger.log_info(f"🎯 Problems: {problem_ids}") self.logger.log_info(f"📁 Session timestamp: {self.session_timestamp}") # 체크포인트에서 재개하는 경우 if resume_from_round > 1: self.logger.log_info(f"🔄 Resuming from round {resume_from_round}") checkpoint_model = self._load_checkpoint(resume_from_round - 1) if checkpoint_model: self.current_model_path = checkpoint_model training_results = { 'start_time': self.start_time.isoformat(), 'session_timestamp': self.session_timestamp, 'benchmark': benchmark_config.name, 'problem_ids': problem_ids, 'total_rounds': total_rounds, 'resume_from_round': resume_from_round, 'rounds': {}, 'success': False, 'error': None } try: # 메인 반복 학습 루프 for round_num in range(resume_from_round, total_rounds + 1): round_start_time = datetime.now() self.logger.log_info(f"" + "="*80) self.logger.log_info(f"🔄 ROUND {round_num}/{total_rounds} - Starting") self.logger.log_info(f"🤖 Current model: {self.current_model_path}") self.logger.log_info(f"" + "="*80) # 단일 라운드 실행 round_result = self._run_single_round( benchmark_config, problem_ids, round_num ) # 라운드 결과 저장 round_end_time = datetime.now() round_duration = (round_end_time - round_start_time).total_seconds() self.round_times[round_num] = round_duration round_result['duration_seconds'] = round_duration round_result['model_before'] = self.current_model_path training_results['rounds'][round_num] = round_result if not round_result['success']: self.logger.log_error(f"❌ Round {round_num} failed: {round_result.get('error', 'Unknown error')}") continue # AZR 학습 실행 if round_result['training_data_files']: self.logger.log_info(f"🎓 Starting AZR training for round {round_num}") new_model_path = self._train_azr_with_round_data( round_result['training_data_files'], round_num ) if new_model_path: self.current_model_path = new_model_path round_result['model_after'] = new_model_path self.logger.log_info(f"✅ Round {round_num} completed successfully") self.logger.log_info(f"🎯 New model: {new_model_path}") # ⭐ VLLM Ray Actor의 가중치도 업데이트 (진정한 모델 공유) if hasattr(self, 'remote_pipeline') and self.remote_pipeline is not None: self.logger.log_info("🔄 Updating VLLM Ray Actor weights with trained model") import ray update_success = ray.get(self.remote_pipeline.update_model_weights.remote(new_model_path)) if update_success: self.logger.log_info("✅ VLLM weights updated successfully for next round") else: self.logger.log_warning("⚠️ Failed to update VLLM weights, using old model") else: self.logger.log_error(f"❌ AZR training failed for round {round_num}") round_result['training_error'] = "AZR training failed" # 체크포인트 저장 (5라운드마다) if round_num % 5 == 0: self._save_checkpoint(round_num, self.current_model_path, training_results) self.logger.log_info(f"💾 Checkpoint saved for round {round_num}") # 라운드 요약 로그 self._log_round_summary(round_num, round_result, round_duration) # 전체 학습 완료 training_results['success'] = True training_results['end_time'] = datetime.now().isoformat() training_results['total_duration_seconds'] = (datetime.now() - self.start_time).total_seconds() training_results['final_model'] = self.current_model_path self.logger.log_info(f"🎉 TTRLVR + AZR iterative training completed successfully!") # VeRL Trainer 정리 if hasattr(self, 'verl_trainer') and self.verl_trainer is not None: self.logger.log_info("🧹 Cleaning up VeRL Trainer...") try: # VeRL trainer cleanup (Ray 등) if hasattr(self.verl_trainer, 'cleanup'): self.verl_trainer.cleanup() self.verl_trainer = None except Exception as cleanup_error: self.logger.log_warning(f"Cleanup warning: {cleanup_error}") self._log_final_summary(training_results) return training_results except Exception as e: self.logger.log_error(f"💥 Iterative training failed: {e}") import traceback traceback.print_exc() return { 'success': False, 'error': str(e), 'rounds': self.round_results } def run_verl_training_only(self, training_data_path: str, round_num: int = 1, experiment_name: Optional[str] = None) -> Dict[str, Any]: """ VeRL training(5단계)만 별도로 실행 1-4단계에서 생성된 데이터로 VeRL PPO 학습만 수행 Args: training_data_path: TTRLVR에서 생성된 학습 데이터 경로 (parquet 파일들) round_num: 라운드 번호 (로그용) experiment_name: 실험 이름 (선택사항) Returns: 학습 결과 딕셔너리 """ try: self.logger.log_info("🚀 Starting VeRL training ONLY (Step 5)") self.logger.log_info("="*80) self.logger.log_info(f"📂 Training data path: {training_data_path}") self.logger.log_info(f"🔄 Round: {round_num}") # VeRL config 로드 (필요시) if not hasattr(self, 'verl_config') or self.verl_config is None: self.logger.log_info("🔧 Loading VeRL config for standalone training") self._load_verl_config() # 학습 데이터 경로 업데이트 if not os.path.exists(training_data_path): raise FileNotFoundError(f"Training data path not found: {training_data_path}") # parquet 파일들 찾기 parquet_files = list(Path(training_data_path).glob("*.parquet")) if not parquet_files: raise FileNotFoundError(f"No parquet files found in: {training_data_path}") # VeRL config 업데이트 self.verl_config.data.train_files = [str(f) for f in parquet_files] self.verl_config.data.val_files = [str(f) for f in parquet_files[:1]] # 첫 번째 파일을 validation으로 self.logger.log_info(f"📊 Found {len(parquet_files)} training files") for i, f in enumerate(parquet_files): self.logger.log_info(f" {i+1}. {f.name}") # ⭐ VeRL trainer 초기화 (실제 데이터로) if not hasattr(self, 'verl_trainer') or self.verl_trainer is None: self.logger.log_info("🚀 Initializing VeRL trainer with actual training data") # GPU 메모리 정리 (FSDP 로드 전) import torch torch.cuda.empty_cache() # VeRL trainer 초기화 self._initialize_verl_trainer(training_data_path) self.logger.log_info(f"✅ FSDP model loaded on GPU {self.available_gpus}") self.logger.log_info("✅ GPU sharing enabled: VLLM + FSDP on same GPUs") else: # 기존 VeRL trainer가 있다면 데이터 경로 업데이트 self.logger.log_info("🔄 Updating existing VeRL trainer with new data files") # Trainer의 config 업데이트 self.verl_trainer.config.data.train_files = self.verl_config.data.train_files self.verl_trainer.config.data.val_files = self.verl_config.data.val_files # init_workers가 데이터로더를 생성하므로 다시 호출 self.logger.log_info("🔧 Re-initializing VeRL workers with new data...") self.verl_trainer.init_workers() self.logger.log_info("✅ VeRL workers re-initialized with actual training data") # 실험명 설정 if experiment_name: self.verl_config.experiment.name = experiment_name else: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") self.verl_config.experiment.name = f"verl_only_round_{round_num}_{timestamp}" self.logger.log_info(f"🏷️ Experiment: {self.verl_config.experiment.name}") # 5단계: VeRL PPO 학습 실행 self.logger.log_info("🎓 Starting VeRL PPO training...") start_time = datetime.now() # VeRL trainer로 직접 학습 실행 try: if hasattr(self, 'verl_trainer') and self.verl_trainer is not None: # 학습 중 생성된 응답을 저장할 디렉토리 설정 (기존 경로 구조에 추가) llm_responses_dir = os.path.join(os.path.dirname(training_data_path), "llm_responses") os.makedirs(llm_responses_dir, exist_ok=True) self.logger.log_info(f"📝 LLM responses will be saved to: {llm_responses_dir}") # VeRL config에 rollout 데이터 저장 경로 설정 self.verl_trainer.config.trainer.rollout_data_dir = llm_responses_dir # 커스텀 로깅을 위한 콜백 설정 self.llm_responses_dir = llm_responses_dir self.response_counter = 0 self.logger.log_info("🎯 Running VeRL PPO training...") self.verl_trainer.fit() training_result = {'success': True, 'model_path': self.current_model_path} self.logger.log_info("✅ VeRL training completed successfully") # JSONL 파일들을 TTRLVR 형식으로 변환 if hasattr(self, 'llm_responses_dir') and os.path.exists(self.llm_responses_dir): self.logger.log_info("📝 Converting VeRL outputs to TTRLVR format...") jsonl_files = list(Path(self.llm_responses_dir).glob("*.jsonl")) for jsonl_file in jsonl_files: self._convert_jsonl_to_ttrlvr_format(str(jsonl_file), self.llm_responses_dir) self.logger.log_info(f"✅ Converted {len(jsonl_files)} JSONL files to TTRLVR format") else: raise ValueError("VeRL trainer not initialized") except Exception as e: self.logger.log_error(f"VeRL training failed: {e}") training_result = {'success': False, 'error': str(e)} end_time = datetime.now() duration = (end_time - start_time).total_seconds() self.logger.log_info(f"⏱️ VeRL training completed in {duration:.1f} seconds") # 결과 구성 result = { 'success': training_result.get('success', False), 'round': round_num, 'experiment_name': self.verl_config.experiment.name, 'training_data_path': training_data_path, 'duration_seconds': duration, 'start_time': start_time.isoformat(), 'end_time': end_time.isoformat(), 'training_files': len(parquet_files), 'model_path': getattr(training_result, 'model_path', self.current_model_path), 'details': training_result } if result['success']: self.logger.log_info("🎉 VeRL training completed successfully!") if 'model_path' in training_result: self.current_model_path = training_result['model_path'] self.logger.log_info(f"🤖 Updated model path: {self.current_model_path}") else: self.logger.log_error("❌ VeRL training failed") self.logger.log_error(f"Error: {training_result.get('error', 'Unknown error')}") return result except Exception as e: self.logger.log_error(f"💥 VeRL-only training failed: {e}") import traceback traceback.print_exc() return { 'success': False, 'error': str(e), 'round': round_num, 'training_data_path': training_data_path } def _process_single_round(self, benchmark_config: BenchmarkConfig, problem_ids: List[str], round_num: int) -> Dict[str, Any]: """단일 라운드 처리""" round_start_time = datetime.now() self.logger.log_info(f"🔄 ROUND {round_num} - Starting") try: # 라운드 실행 로직을 _run_single_round로 위임 return self._run_single_round(benchmark_config, problem_ids, round_num) except Exception as e: self.logger.log_error(f"Round {round_num} failed: {e}") return { 'success': False, 'error': str(e), 'round': round_num, 'duration_seconds': (datetime.now() - round_start_time).total_seconds() } def _run_single_round(self, benchmark_config: BenchmarkConfig, problem_ids: List[str], round_num: int) -> Dict[str, Any]: """단일 라운드 실행 - Ray를 활용한 병렬 TTRLVR 파이프라인 실행""" round_result = { 'round_num': round_num, 'problems': {}, 'training_data_files': [], 'success': False, 'error': None, 'stats': { 'total_problems': len(problem_ids), 'successful_problems': 0, 'failed_problems': 0, 'total_tasks': 0, 'tasks_by_type': {'induction': 0, 'deduction': 0, 'abduction': 0} } } try: # VeRL config 로드 (Ray 설정 확인용) if not hasattr(self, 'verl_config') or self.verl_config is None: self.logger.log_info("🔧 Loading VeRL config for Ray settings") self._load_verl_config() # Ray 클러스터가 초기화되었는지 확인 if not self.ray_initialized: self.logger.log_info("🚀 Initializing Ray for data generation") self._initialize_ray_cluster() # ⭐ VeRL trainer는 5단계에서 실제 데이터로 초기화 # GPU 공유 설정만 미리 확인 if round_num == 1: self.logger.log_info("📌 VeRL trainer will be initialized in Step 5 with actual data") self.logger.log_info("🔧 GPU sharing plan: VLLM (GPU 1,2) + FSDP (GPU 0,1,2,3)") # 현재 모델로 파이프라인 업데이트 self._update_pipeline_model(self.current_model_path) successful_problems = 0 # 항상 순차 처리 사용 (문제 간 병렬 처리 제거) # 단일 문제 내에서만 VLLM 배치 병렬 처리 사용 self.logger.log_info(f"📝 Using sequential processing for {len(problem_ids)} problems") self.logger.log_info(" - Multi-problem parallelization disabled") self.logger.log_info(" - Single-problem VLLM batch processing enabled") results = self._process_problems_sequential(benchmark_config, problem_ids, round_num) # 결과 통합 for problem_id, pipeline_result in results.items(): round_result['problems'][problem_id] = pipeline_result if pipeline_result['success']: successful_problems += 1 # AZR 학습 데이터 파일 수집 if 'azr_training_data' in pipeline_result: round_result['training_data_files'].append({ 'problem_id': problem_id, 'files': pipeline_result['azr_training_data'] }) # 통계 업데이트 if 'azr_data_saving' in pipeline_result['steps']: total_tasks = pipeline_result['steps']['azr_data_saving']['total_tasks'] round_result['stats']['total_tasks'] += total_tasks self.logger.log_info(f"✅ {problem_id} completed successfully") else: self.logger.log_error(f"❌ {problem_id} failed: {pipeline_result.get('error', 'Unknown error')}") # 라운드 통계 업데이트 round_result['stats']['successful_problems'] = successful_problems round_result['stats']['failed_problems'] = len(problem_ids) - successful_problems round_result['success'] = successful_problems > 0 if successful_problems == 0: round_result['error'] = "No problems completed successfully" return round_result except Exception as e: round_result['error'] = str(e) return round_result def _initialize_pipeline(self): """Ray Actor로 파이프라인 초기화 (VeRL 패턴)""" if self.remote_pipeline is None: try: # TTRLVR 파이프라인용 config 업데이트 ttrlvr_config = self.config # 실행 모드에 따른 엔진 선택 # VeRL config에서 rollout name 확인 if hasattr(self, 'verl_config') and self.verl_config and hasattr(self.verl_config, 'actor_rollout_ref'): rollout_name = self.verl_config.actor_rollout_ref.rollout.name # HuggingFace rollout이면 HuggingFace 사용 use_vllm = (rollout_name == "vllm") else: # 기본값: distributed면 vllm, single_gpu면 huggingface use_vllm = (self.execution_mode == "distributed") ttrlvr_config.use_vllm_for_data_generation = use_vllm engine_name = "vllm" if use_vllm else "huggingface" self.logger.log_info(f"🔧 TTRLVR data generation using: {engine_name} (execution_mode: {self.execution_mode})") # Config 디버깅 로그 추가 self.logger.log_info(f"🔍 Config debug: num_program_variations = {ttrlvr_config.num_program_variations}") self.logger.log_info(f"🔍 Config debug: skip_task_evaluation = {getattr(ttrlvr_config, 'skip_task_evaluation', False)}") # Ray Actor로 파이프라인 생성 (GPU 개수에 맞춰 동적 생성) from absolute_zero_reasoner.testtime.complete_pipeline import RemoteTestTimePipeline gpu_count = len(self.available_gpus) self.logger.log_info(f"🚀 Creating RemoteTestTimePipeline with {gpu_count} GPUs, model: {self.current_model_path}") # 로그 파일 경로를 환경 변수로 설정 import os if hasattr(self.logger, 'log_file_path') and self.logger.log_file_path: runtime_env = { "env_vars": { "TTRLVR_LOG_FILE": self.logger.log_file_path, "CUDA_VISIBLE_DEVICES": os.environ.get("CUDA_VISIBLE_DEVICES", "0") } } self.logger.log_info(f"🔧 Setting TTRLVR_LOG_FILE for Ray worker: {self.logger.log_file_path}") else: runtime_env = { "env_vars": { "CUDA_VISIBLE_DEVICES": os.environ.get("CUDA_VISIBLE_DEVICES", "0") } } self.logger.log_warning("⚠️ Logger does not have log_file_path attribute, Ray worker will create its own log file") # GPU 개수에 따른 Ray Actor 생성 # ⭐ GPU를 독점하지 않도록 num_gpus=0으로 설정 # VLLM은 내부적으로 CUDA_VISIBLE_DEVICES로 첫 2개 GPU만 사용 gpu_count = len(self.available_gpus) # 사용 가능한 GPU 중 첫 2개를 VLLM용으로 할당 if gpu_count >= 2: # 실제 GPU 인덱스 가져오기 cuda_devices = os.environ.get("CUDA_VISIBLE_DEVICES", "0,1,2,3").split(',') vllm_gpus = f"{cuda_devices[0]},{cuda_devices[1]}" # 첫 2개 GPU else: vllm_gpus = os.environ.get("CUDA_VISIBLE_DEVICES", "0") # Runtime 환경에 VLLM GPU 제한 추가 # Ray runtime_env에서 쉼표가 있는 환경변수를 제대로 처리하도록 명시적으로 문자열로 설정 runtime_env["env_vars"]["CUDA_VISIBLE_DEVICES"] = str(vllm_gpus) runtime_env["env_vars"]["VLLM_USE_SPECIFIC_GPUS"] = str(vllm_gpus) self.logger.log_info(f"🎯 Creating Ray Actor without exclusive GPU allocation") self.logger.log_info(f" - VLLM will use GPUs: {vllm_gpus} (via CUDA_VISIBLE_DEVICES)") self.logger.log_info(f" - FSDP can use all GPUs: {os.environ.get('CUDA_VISIBLE_DEVICES', '0,1,2,3')}") # AZR 방식: GPU 할당 없이 Ray Actor 생성 # GPU는 CUDA_VISIBLE_DEVICES로만 제어 RemoteTestTimePipelineWithGPUs = RemoteTestTimePipeline.options( num_cpus=4, # VLLM 처리를 위한 충분한 CPU 할당 # num_gpus 설정하지 않음 - GPU는 CUDA_VISIBLE_DEVICES로 제어 runtime_env=runtime_env ) self.remote_pipeline = RemoteTestTimePipelineWithGPUs.remote( config=ttrlvr_config, model_path=self.current_model_path ) self.logger.log_info(f"🔄 RemoteTestTimePipeline initialized in {self.execution_mode} mode") if self.execution_mode == "distributed": self.logger.log_info(f" - Using VLLM distributed inference on GPU 0,1") self.logger.log_info(f" - FSDP can use all GPUs: 0,1,2,3 with sharing") self.logger.log_info(f" - Model loading handled inside Ray worker") else: self.logger.log_info(f" - Using HuggingFace single GPU inference inside Ray worker") except Exception as e: self.logger.log_error(f"Failed to initialize pipeline: {e}") raise def _update_pipeline_model(self, model_path: str): """파이프라인의 모델 레퍼런스 업데이트 (메모리 내 동일 모델 유지)""" try: # Ray Actor 파이프라인이 없으면 초기화 if self.remote_pipeline is None: self._initialize_pipeline() # 모델 경로 업데이트 (Ray worker는 새로운 모델 경로로 재생성) self.current_model_path = model_path self.logger.log_info(f"🔄 Pipeline model path updated to: {model_path}") # 새로운 모델이면 Ray Actor 재생성 if hasattr(self, '_last_model_path') and self._last_model_path != model_path: self.logger.log_info("🔄 Model path changed, recreating Ray Actor") self.remote_pipeline = None self._initialize_pipeline() self._last_model_path = model_path except Exception as e: self.logger.log_warning(f"Failed to update pipeline model: {e}") def _train_azr_with_round_data(self, training_data_files: List[Dict[str, Any]], round_num: int) -> Optional[str]: """해당 라운드 데이터로 메모리 내 모델 업데이트""" try: # 1. 라운드별 통합 데이터 생성 combined_data_path = self._combine_round_data(training_data_files, round_num) if not combined_data_path: self.logger.log_error(f"Failed to combine training data for round {round_num}") return None self.logger.log_info(f"📊 Combined training data: {combined_data_path}") # 2. 메모리 내에서 직접 모델 업데이트 (디스크 저장/로드 없음) updated_model = self._update_model_in_memory(combined_data_path, round_num) if updated_model: # 메모리 내 모델 인스턴스 업데이트 self.current_model = updated_model # 파이프라인 컴포넌트들도 업데이트된 모델 사용 if self.complete_pipeline: self.complete_pipeline.model = self.current_model if hasattr(self.complete_pipeline, 'solution_generator') and self.complete_pipeline.solution_generator: self.complete_pipeline.solution_generator.model = self.current_model if hasattr(self.complete_pipeline, 'ipo_extractor') and self.complete_pipeline.ipo_extractor: self.complete_pipeline.ipo_extractor.model = self.current_model # 참조용 경로 업데이트 virtual_model_path = f"memory://round_{round_num}_model" self.logger.log_info(f"✅ Model updated in memory for round {round_num}") self.logger.log_info(f"🎯 Virtual model path: {virtual_model_path}") return virtual_model_path else: self.logger.log_error(f"❌ Model update failed for round {round_num}") return None except Exception as e: self.logger.log_error(f"💥 Model update execution failed: {e}") return None def _update_model_in_memory(self, training_data_path: str, round_num: int) -> Optional[Any]: """메모리 내에서 VeRL을 사용하여 모델을 직접 업데이트 (AZR REINFORCE++ 학습)""" try: self.logger.log_info(f"🎓 Starting VeRL-based AZR training for round {round_num}") self.logger.log_info(f"📂 Training data: {training_data_path}") # 간단한 체크: 학습 데이터가 존재하는지 확인 task_files = ['induction.parquet', 'deduction.parquet', 'abduction.parquet'] available_files = [] for task_file in task_files: file_path = os.path.join(training_data_path, task_file) if os.path.exists(file_path): available_files.append(task_file) # 파일 크기 확인 file_size = os.path.getsize(file_path) self.logger.log_info(f" 📄 {task_file}: {file_size} bytes") if not available_files: self.logger.log_warning("⚠️ No training data files found in specified directory") # 실제 생성된 데이터 디렉토리 검색 self.logger.log_info("🔍 Searching for actual training data...") actual_data_path = self._find_actual_training_data() if actual_data_path: self.logger.log_info(f"✅ Found actual training data: {actual_data_path}") training_data_path = actual_data_path # 다시 파일 확인 for task_file in task_files: file_path = os.path.join(training_data_path, task_file) if os.path.exists(file_path): available_files.append(task_file) file_size = os.path.getsize(file_path) self.logger.log_info(f" 📄 {task_file}: {file_size} bytes") else: self.logger.log_error("❌ No actual training data found anywhere") return None self.logger.log_info(f"📚 Processing {len(available_files)} task types") # ⭐ Step 1-4가 완료되었으므로 VLLM Ray Actor 해제하여 메모리 확보 if hasattr(self, 'remote_pipeline') and self.remote_pipeline is not None: self.logger.log_info("🧹 Releasing VLLM Ray Actor memory before Step 5...") try: # 먼저 cleanup 메서드 호출하여 내부 리소스 정리 cleanup_result = ray.get(self.remote_pipeline.cleanup.remote()) if cleanup_result: self.logger.log_info("✅ VLLM internal resources cleaned up") # 그 다음 Ray Actor 종료 ray.kill(self.remote_pipeline) self.remote_pipeline = None self.logger.log_info("✅ VLLM Ray Actor killed successfully") # GPU 메모리 정리 import torch torch.cuda.empty_cache() # 잠시 대기하여 메모리가 완전히 해제되도록 함 import time time.sleep(2) # GPU 메모리 상태 확인 if torch.cuda.is_available(): for i in range(torch.cuda.device_count()): memory_allocated = torch.cuda.memory_allocated(i) / 1024**3 memory_reserved = torch.cuda.memory_reserved(i) / 1024**3 self.logger.log_info(f" GPU {i}: Allocated={memory_allocated:.2f}GB, Reserved={memory_reserved:.2f}GB") except Exception as e: self.logger.log_warning(f"⚠️ Error during VLLM cleanup: {e}") # VeRL trainer 초기화 (첫 번째 라운드에서만) if self.verl_trainer is None: self._initialize_verl_trainer(training_data_path) else: # 기존 trainer에서 데이터만 업데이트 self._update_verl_trainer_data(training_data_path) if self.verl_trainer is None: self.logger.log_error("Failed to initialize VeRL trainer") return self.current_model # VeRL 메모리 내 학습 실행 self.logger.log_info(f"🚀 Starting in-memory VeRL training for round {round_num}") # 학습 중 생성된 응답을 저장할 디렉토리 설정 llm_responses_dir = os.path.join(os.path.dirname(training_data_path), "llm_responses") os.makedirs(llm_responses_dir, exist_ok=True) self.logger.log_info(f"📝 LLM responses will be saved to: {llm_responses_dir}") # VeRL config에 rollout 데이터 저장 경로 설정 self.verl_config.trainer.rollout_data_dir = llm_responses_dir # Epoch 수 동적 조정 (필요시) if hasattr(self, 'batch_epochs') and self.batch_epochs > 1: original_epochs = self.verl_config.trainer.total_epochs self.verl_config.trainer.total_epochs = self.batch_epochs self.logger.log_info(f"🔧 Adjusted epochs from {original_epochs} to {self.batch_epochs}") # main_azr_ppo.py처럼 ppo_mini_batch_size 자동 계산 (중요!) train_batch_size = self.verl_config.data.train_batch_size problem_types = getattr(self.verl_config.azr, 'problem_types', ['code_i', 'code_o', 'code_f']) train_propose = getattr(self.verl_config.azr, 'train_propose', False) # 원래 값 저장 original_ppo_mini_batch_size = self.verl_config.actor_rollout_ref.actor.ppo_mini_batch_size # 자동 계산: train_batch_size * problem_types 개수 * (propose 여부) calculated_ppo_mini_batch_size = train_batch_size * len(problem_types) * (2 if train_propose else 1) self.verl_config.actor_rollout_ref.actor.ppo_mini_batch_size = calculated_ppo_mini_batch_size # data_len도 자동 계산 (main_azr_ppo.py와 동일) update_iteration = getattr(self.verl_config.azr.data_selection_strategy, 'update_iteration', 1) self.verl_config.azr.data_selection_strategy.data_len = train_batch_size * update_iteration self.logger.log_info(f"🔧 Auto-calculated ppo_mini_batch_size: {original_ppo_mini_batch_size} → {calculated_ppo_mini_batch_size}") self.logger.log_info(f" - train_batch_size: {train_batch_size}") self.logger.log_info(f" - problem_types: {len(problem_types)} ({problem_types})") self.logger.log_info(f" - train_propose: {train_propose}") self.logger.log_info(f"🔧 Auto-calculated data_len: {self.verl_config.azr.data_selection_strategy.data_len}") # VeRL 학습 실행 self.logger.log_info(f"🏃 Calling verl_trainer.fit() for round {round_num}") self.logger.log_info(f"📊 Config - total_epochs: {self.verl_config.trainer.total_epochs}") self.logger.log_info(f"📊 Config - train_batch_size: {self.verl_config.data.train_batch_size}") self.logger.log_info(f"📊 Config - total_training_steps: {self.verl_config.trainer.total_training_steps}") # trainer 인스턴스의 config도 업데이트 (중요!) if hasattr(self.verl_trainer, 'config'): self.verl_trainer.config.trainer.rollout_data_dir = llm_responses_dir # 실제 fit 호출 fit_start = datetime.now() self.verl_trainer.fit() fit_end = datetime.now() fit_duration = (fit_end - fit_start).total_seconds() self.logger.log_info(f"⏱️ verl_trainer.fit() completed in {fit_duration:.3f} seconds") # JSONL 파일들을 TTRLVR 형식으로 변환 if os.path.exists(llm_responses_dir): self.logger.log_info("📝 Converting VeRL outputs to TTRLVR format...") jsonl_files = list(Path(llm_responses_dir).glob("*.jsonl")) for jsonl_file in jsonl_files: self._convert_jsonl_to_ttrlvr_format(str(jsonl_file), llm_responses_dir) self.logger.log_info(f"✅ Converted {len(jsonl_files)} JSONL files to TTRLVR format") # 학습된 모델은 이미 VeRL trainer 내부에서 업데이트됨 # 모델 인스턴스는 메모리에서 계속 유지됨 self.logger.log_info(f"✅ Model updated successfully with REINFORCE++ for round {round_num}") # 학습 후 체크포인트 저장 (조건부) if self._should_save_checkpoint(round_num): self._save_round_checkpoint(round_num) # 현재 모델 객체 반환 (가중치가 업데이트됨) # VeRL에서는 모델이 Ray worker 내부에서 업데이트되므로, 심볼릭 참조 반환 if self.current_model is None: self.current_model = "verl_trained_model" # 심볼릭 참조 return self.current_model except Exception as e: self.logger.log_error(f"Failed to update model in memory: {e}") import traceback traceback.print_exc() return None def _combine_round_data(self, training_data_files: List[Dict[str, Any]], round_num: int) -> Optional[str]: """라운드의 모든 문제 데이터를 task별로 통합""" try: # 통합 데이터 저장 디렉토리 output_dir = f"/tmp/ttrlvr_azr_training/round_{round_num}" os.makedirs(output_dir, exist_ok=True) # Task 타입별 데이터 수집 combined_data = { 'induction': [], 'deduction': [], 'abduction': [] } total_files_processed = 0 # 각 문제의 데이터 파일들을 순회 for problem_data in training_data_files: problem_id = problem_data['problem_id'] files = problem_data['files'] self.logger.log_info(f"📁 Processing data for {problem_id}") # Task 타입별 파일 처리 for task_type in ['induction', 'deduction', 'abduction']: if task_type in files: file_path = files[task_type] if os.path.exists(file_path): try: df = pd.read_parquet(file_path) task_data = df.to_dict('records') combined_data[task_type].extend(task_data) total_files_processed += 1 self.logger.log_info(f" ✅ {task_type}: {len(task_data)} tasks from {file_path}") except Exception as e: self.logger.log_warning(f" ⚠️ Failed to read {file_path}: {e}") else: self.logger.log_warning(f" ⚠️ File not found: {file_path}") if total_files_processed == 0: self.logger.log_error("No training data files found to combine") return None # 통합된 데이터를 task별 parquet 파일로 저장 combined_files = {} total_tasks = 0 for task_type, data in combined_data.items(): if data: # ipo_group_id로 정렬하여 배치 보장 df = pd.DataFrame(data) df = df.sort_values('ipo_group_id') # 파일 저장 file_path = os.path.join(output_dir, f"{task_type}.parquet") df.to_parquet(file_path, index=False) combined_files[task_type] = file_path total_tasks += len(data) self.logger.log_info(f"💾 Saved {len(data)} {task_type} tasks to {file_path}") else: self.logger.log_warning(f"No {task_type} tasks found for round {round_num}") # 통계 저장 stats = { 'round': round_num, 'total_tasks': total_tasks, 'tasks_by_type': {k: len(v) for k, v in combined_data.items()}, 'files': combined_files, 'problems_processed': len(training_data_files), 'batch_groups': len(set( task['ipo_group_id'] for task_data in combined_data.values() for task in task_data )) } stats_file = os.path.join(output_dir, 'round_training_stats.json') with open(stats_file, 'w') as f: json.dump(stats, f, indent=2) self.logger.log_info(f"📊 Round {round_num} data summary:") self.logger.log_info(f" - Total tasks: {total_tasks}") self.logger.log_info(f" - Batch groups: {stats['batch_groups']}") self.logger.log_info(f" - Files: {list(combined_files.keys())}") return output_dir except Exception as e: self.logger.log_error(f"Failed to combine round data: {e}") return None def _save_checkpoint(self, round_num: int, model_path: str, training_results: Dict[str, Any]): """체크포인트 저장 (모델 상태, 학습 통계, 라운드 정보)""" try: checkpoint_path = os.path.join(self.checkpoint_dir, f"checkpoint_round_{round_num}") os.makedirs(checkpoint_path, exist_ok=True) # 체크포인트 메타데이터 checkpoint_data = { 'round_num': round_num, 'model_path': model_path, 'timestamp': datetime.now().isoformat(), 'total_rounds': training_results.get('total_rounds', 30), 'completed_rounds': round_num, 'training_results': training_results, 'round_times': self.round_times } # JSON으로 저장 checkpoint_file = os.path.join(checkpoint_path, 'checkpoint.json') with open(checkpoint_file, 'w') as f: json.dump(checkpoint_data, f, indent=2) # 요약 텍스트 파일 저장 summary_file = os.path.join(checkpoint_path, 'summary.txt') with open(summary_file, 'w') as f: f.write(f"TTRLVR + AZR Training Checkpoint - Round {round_num}\n") f.write("=" * 60 + "\n\n") f.write(f"Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"Completed Rounds: {round_num}/{training_results.get('total_rounds', 30)}\n") f.write(f"Current Model: {model_path}\n") f.write(f"Total Training Time: {sum(self.round_times.values()):.1f} seconds\n\n") # 라운드별 통계 f.write("Round Statistics:\n") f.write("-" * 20 + "\n") for r_num, r_time in self.round_times.items(): if r_num <= round_num: round_result = training_results['rounds'].get(r_num, {}) success = "✅" if round_result.get('success', False) else "❌" f.write(f"Round {r_num:2d}: {success} ({r_time:.1f}s)\n") self.logger.log_info(f"💾 Checkpoint saved: {checkpoint_path}") except Exception as e: self.logger.log_error(f"Failed to save checkpoint: {e}") def _load_checkpoint(self, round_num: int) -> Optional[str]: """체크포인트에서 모델 로드""" try: checkpoint_path = os.path.join(self.checkpoint_dir, f"checkpoint_round_{round_num}") checkpoint_file = os.path.join(checkpoint_path, 'checkpoint.json') if not os.path.exists(checkpoint_file): self.logger.log_warning(f"Checkpoint not found: {checkpoint_file}") return None with open(checkpoint_file, 'r') as f: checkpoint_data = json.load(f) model_path = checkpoint_data.get('model_path') if model_path and os.path.exists(model_path): self.logger.log_info(f"📂 Loaded checkpoint from round {round_num}") self.logger.log_info(f"🤖 Model: {model_path}") # 이전 라운드 시간 복원 if 'round_times' in checkpoint_data: self.round_times.update(checkpoint_data['round_times']) return model_path else: self.logger.log_warning(f"Model path in checkpoint does not exist: {model_path}") return None except Exception as e: self.logger.log_error(f"Failed to load checkpoint: {e}") return None def _log_round_summary(self, round_num: int, round_result: Dict[str, Any], duration: float): """라운드 완료 요약 로그""" stats = round_result.get('stats', {}) self.logger.log_info(f"") self.logger.log_info(f"📊 ROUND {round_num} SUMMARY") self.logger.log_info(f"" + "="*50) self.logger.log_info(f"⏱️ Duration: {duration:.1f} seconds") self.logger.log_info(f"📝 Problems: {stats.get('successful_problems', 0)}/{stats.get('total_problems', 0)} successful") self.logger.log_info(f"🎯 Total tasks: {stats.get('total_tasks', 0)}") tasks_by_type = stats.get('tasks_by_type', {}) for task_type, count in tasks_by_type.items(): if count > 0: self.logger.log_info(f" - {task_type}: {count}") if round_result.get('success'): self.logger.log_info(f"✅ Round {round_num} completed successfully") else: self.logger.log_info(f"❌ Round {round_num} failed") self.logger.log_info(f"") def _log_final_summary(self, training_results: Dict[str, Any]): """전체 학습 완료 요약 로그""" total_duration = training_results.get('total_duration_seconds', 0) total_rounds = training_results.get('total_rounds', 0) completed_rounds = len(training_results.get('rounds', {})) self.logger.log_info(f"") self.logger.log_info(f"🎉 TTRLVR + AZR TRAINING COMPLETED") self.logger.log_info(f"" + "="*60) self.logger.log_info(f"⏱️ Total Duration: {total_duration:.1f} seconds ({total_duration/3600:.1f} hours)") self.logger.log_info(f"🔄 Completed Rounds: {completed_rounds}/{total_rounds}") self.logger.log_info(f"🤖 Final Model: {training_results.get('final_model', 'N/A')}") # 라운드별 성공/실패 통계 successful_rounds = 0 failed_rounds = 0 for round_result in training_results['rounds'].values(): if round_result.get('success'): successful_rounds += 1 else: failed_rounds += 1 self.logger.log_info(f"📊 Round Statistics:") self.logger.log_info(f" - Successful: {successful_rounds}") self.logger.log_info(f" - Failed: {failed_rounds}") self.logger.log_info(f" - Success Rate: {successful_rounds/completed_rounds*100:.1f}%") # 평균 라운드 시간 if self.round_times: avg_round_time = sum(self.round_times.values()) / len(self.round_times) self.logger.log_info(f"⌛ Average Round Time: {avg_round_time:.1f} seconds") self.logger.log_info(f"") self.logger.log_info(f"💾 All results saved to: {self.checkpoint_dir}") self.logger.log_info(f"🎯 Training completed successfully!") self.logger.log_info(f"") def _initialize_verl_trainer(self, training_data_path: str): """첫 번째 라운드에서 VeRL trainer 및 Ray 클러스터 초기화""" try: self.logger.log_info("🚀 Initializing VeRL trainer for AZR training") # Ray 초기화 (전체 세션에서 한 번만) if not self.ray_initialized: self.logger.log_info("🚀 Initializing Ray cluster for first time") self._initialize_ray_cluster() else: self.logger.log_info("♻️ Using existing Ray cluster") # VeRL config 로드 (아직 로드되지 않은 경우) if not hasattr(self, 'verl_config') or self.verl_config is None: self._load_verl_config() # 데이터 경로 동적 설정 (parquet 파일 리스트) train_files = [ os.path.join(training_data_path, "induction.parquet"), os.path.join(training_data_path, "deduction.parquet"), os.path.join(training_data_path, "abduction.parquet") ] # 존재하는 파일만 선택 valid_train_files = [f for f in train_files if os.path.exists(f)] self.verl_config.data.train_files = valid_train_files self.verl_config.data.val_files = valid_train_files # 체크포인트 비활성화로 인해 고유 디렉토리 설정 불필요 # VeRL이 기존 모델 인스턴스를 사용하도록 설정 (메모리 절약) # Config에는 원본 경로 유지 (tokenizer 로드용) self.verl_config.actor_rollout_ref.model.path = self.original_model_name self.logger.log_info(f"🔧 VeRL config set to original model path: {self.original_model_name}") # TTRLVR 데이터 생성에서 사용할 엔진 설정 추가 inference_engine = getattr(self.verl_config.data, 'ttrlvr_inference_engine', 'vllm') self.logger.log_info(f"🔧 TTRLVR inference engine: {inference_engine}") self.logger.log_info(f"📁 VeRL config loaded successfully") self.logger.log_info(f"📂 Training data files: {len(self.verl_config.data.train_files)}") # VeRL trainer 생성을 위한 필수 구성 요소들 준비 from transformers import AutoTokenizer from verl.trainer.ppo.ray_trainer import ResourcePoolManager, Role # Worker 클래스들 import (main_azr_ppo.py와 동일) import ray # VeRL 로깅을 TTRLVR 로그에 통합 import logging # 여러 VeRL 관련 로거들 설정 verl_loggers = [ "verl", "verl.workers", "verl.trainer", "verl.workers.fsdp_workers", "verl.workers.sharding_manager", "absolute_zero_reasoner.trainer.ppo" ] # TTRLVR 로그 파일에 VeRL 로그 추가 if hasattr(self.logger, 'log_file_path') and self.logger.log_file_path: file_handler = logging.FileHandler(self.logger.log_file_path) file_handler.setFormatter(logging.Formatter('[VeRL] %(asctime)s - %(name)s - %(levelname)s - %(message)s')) for logger_name in verl_loggers: verl_logger = logging.getLogger(logger_name) verl_logger.setLevel(logging.INFO) verl_logger.addHandler(file_handler) # strategy 확인 strategy = self.verl_config.actor_rollout_ref.actor.strategy self.logger.log_info(f"🔧 Actor strategy: {strategy}") if strategy in ["fsdp", "fsdp2"]: from verl.single_controller.ray import RayWorkerGroup from verl.workers.fsdp_workers import ActorRolloutRefWorker, AsyncActorRolloutRefWorker, CriticWorker # VeRL worker 선택 (AZR과 동일하게 매번 새로운 vLLM 생성) actor_rollout_cls = AsyncActorRolloutRefWorker if self.verl_config.actor_rollout_ref.rollout.mode == "async" else ActorRolloutRefWorker ray_worker_group_cls = RayWorkerGroup elif strategy == "none": # 단일 GPU 환경 - FSDP worker를 사용하되 FSDP는 비활성화 self.logger.log_info("🔧 Using single GPU configuration (FSDP workers without FSDP)") from verl.single_controller.ray import RayWorkerGroup from verl.workers.fsdp_workers import ActorRolloutRefWorker, AsyncActorRolloutRefWorker, CriticWorker # 단일 GPU에서도 FSDP worker 사용 (FSDP는 내부에서 비활성화됨) actor_rollout_cls = AsyncActorRolloutRefWorker if self.verl_config.actor_rollout_ref.rollout.mode == "async" else ActorRolloutRefWorker ray_worker_group_cls = RayWorkerGroup else: raise NotImplementedError(f"Strategy '{strategy}' not supported. Supported: fsdp, fsdp2, none") # Tokenizer 초기화 (원본 모델 경로 사용) if self.current_model_path.startswith('memory://'): # 가상 경로인 경우 원본 모델 경로 사용 tokenizer_path = self.original_model_name self.logger.log_info(f"🔧 Using original model path for tokenizer: {self.original_model_name}") else: tokenizer_path = self.current_model_path tokenizer = AutoTokenizer.from_pretrained(tokenizer_path) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token # Resource pool spec 설정 (VeRL API에 맞게) resource_pool_spec = { "actor_rollout": [0], # GPU 0 사용 "critic": [0], "ref": [0], "reward": [0] } # Role mapping 설정 (main_azr_ppo.py와 동일하게 ray.remote로 래핑) role_worker_mapping = { Role.ActorRollout: ray.remote(actor_rollout_cls), Role.Critic: ray.remote(CriticWorker), } # Resource pool manager 초기화 (main_azr_ppo.py와 동일한 API) global_pool_id = "global_pool" resource_pool_spec = { global_pool_id: [self.verl_config.trainer.n_gpus_per_node] * self.verl_config.trainer.nnodes, } mapping = { Role.ActorRollout: global_pool_id, Role.Critic: global_pool_id, } resource_pool_manager = ResourcePoolManager( resource_pool_spec=resource_pool_spec, mapping=mapping ) # ⭐ 핵심: VeRL trainer 생성 전에 total_training_steps 미리 계산 self.logger.log_info("🔢 Pre-calculating total_training_steps before trainer creation") # 데이터로더 크기 예상 계산 (parquet 파일 기반) import pandas as pd task_files = ['induction.parquet', 'deduction.parquet', 'abduction.parquet'] task_dataloader_sizes = [] for task_file in task_files: file_path = os.path.join(training_data_path, task_file) if os.path.exists(file_path): df = pd.read_parquet(file_path) train_batch_size = self.verl_config.data.train_batch_size task_dataloader_size = (len(df) + train_batch_size - 1) // train_batch_size task_dataloader_sizes.append(task_dataloader_size) self.logger.log_info(f" 📄 {task_file}: {len(df)} samples → {task_dataloader_size} batches") # TTRLVR에서는 모든 task에서 동시에 배치를 가져와야 하므로 최소값 사용 if task_dataloader_sizes: estimated_dataloader_size = min(task_dataloader_sizes) estimated_total_training_steps = estimated_dataloader_size * self.verl_config.trainer.total_epochs self.logger.log_info(f" 🔢 Min dataloader size: {estimated_dataloader_size}, Total steps: {estimated_total_training_steps}") else: estimated_total_training_steps = 100 # fallback self.logger.log_info(f"📊 Pre-calculated training steps:") self.logger.log_info(f" - Task dataloader sizes: {task_dataloader_sizes}") self.logger.log_info(f" - Min dataloader size: {estimated_dataloader_size}") self.logger.log_info(f" - Total epochs: {self.verl_config.trainer.total_epochs}") self.logger.log_info(f" - Estimated total_training_steps: {estimated_total_training_steps}") # VeRL config에 미리 주입 (VeRL trainer 생성 전에!) from omegaconf import OmegaConf, open_dict OmegaConf.set_struct(self.verl_config, True) with open_dict(self.verl_config): # Actor optim에 주입 self.verl_config.actor_rollout_ref.actor.optim.total_training_steps = estimated_total_training_steps # Trainer 레벨에도 주입 (VeRL이 이 값을 참조함) self.verl_config.trainer.total_training_steps = estimated_total_training_steps # Critic 사용시 주입 if hasattr(self.verl_config, 'critic') and self.verl_config.critic.get('include_critic', False): self.verl_config.critic.optim.total_training_steps = estimated_total_training_steps self.logger.log_info(f"✅ Injected total_training_steps={estimated_total_training_steps} into config before trainer creation") # 주입된 값 확인 actor_value = OmegaConf.select(self.verl_config, "actor_rollout_ref.actor.optim.total_training_steps") trainer_value = OmegaConf.select(self.verl_config, "trainer.total_training_steps") self.logger.log_info(f"🔍 Verification: actor.optim.total_training_steps = {actor_value}") self.logger.log_info(f"🔍 Verification: trainer.total_training_steps = {trainer_value}") # VeRL trainer 생성 (main_azr_ppo.py와 동일한 방식) self.logger.log_info("🚀 Creating new VLLM for VeRL (AZR pattern)") self.verl_trainer = CodeIORayPPOTrainer( config=self.verl_config, tokenizer=tokenizer, role_worker_mapping=role_worker_mapping, resource_pool_manager=resource_pool_manager, ray_worker_group_cls=ray_worker_group_cls ) # ⭐ 핵심: Worker 초기화 (main_azr_ppo.py와 동일) self.logger.log_info("🔧 Initializing VeRL workers...") self.verl_trainer.init_workers() self.logger.log_info("✅ VeRL workers initialized") # ⭐ 검증: 실제 dataloader 크기와 비교 self.logger.log_info(f"🔍 Verifying dataloader after trainer creation:") if hasattr(self.verl_trainer, 'train_dataloader'): actual_dataloader_size = len(self.verl_trainer.train_dataloader) if self.verl_trainer.train_dataloader else 0 self.logger.log_info(f" - Actual dataloader size: {actual_dataloader_size}") self.logger.log_info(f" - Estimated dataloader size: {estimated_dataloader_size}") if actual_dataloader_size != estimated_dataloader_size: self.logger.log_warning(f"⚠️ Dataloader size mismatch! Estimated: {estimated_dataloader_size}, Actual: {actual_dataloader_size}") else: self.logger.log_info("✅ Dataloader size estimation was correct") else: self.logger.log_warning("⚠️ No train_dataloader found after trainer creation") # ⭐ 핵심: VeRL trainer의 모델을 기존 인스턴스로 교체 (메모리 절약) self._replace_verl_model_with_existing_instance() self.logger.log_info("✅ VeRL trainer initialized successfully") except Exception as e: self.logger.log_error(f"Failed to initialize VeRL trainer: {e}") import traceback traceback.print_exc() self.verl_trainer = None def _replace_verl_model_with_existing_instance(self): """VeRL trainer의 모델을 기존 인스턴스로 교체하여 메모리 절약""" try: self.logger.log_info("🔄 Replacing VeRL models with existing instance for memory efficiency") # Actor 모델 교체 if hasattr(self.verl_trainer, 'actor_rollout_ref'): if hasattr(self.verl_trainer.actor_rollout_ref, 'actor'): if hasattr(self.verl_trainer.actor_rollout_ref.actor, 'model'): # 기존 VeRL 모델 삭제 (메모리 해제) del self.verl_trainer.actor_rollout_ref.actor.model # 기존 인스턴스로 교체 self.verl_trainer.actor_rollout_ref.actor.model = self.current_model self.logger.log_info("✅ Actor model replaced with existing instance") # Rollout 모델도 동일하게 교체 (필요시) if hasattr(self.verl_trainer.actor_rollout_ref, 'rollout'): if hasattr(self.verl_trainer.actor_rollout_ref.rollout, 'llm'): # VLLM 엔진이 있는 경우 기존 모델과 연결 self.logger.log_info("🔧 Rollout engine detected - using existing model weights") # 메모리 정리 import torch if torch.cuda.is_available(): torch.cuda.empty_cache() current_memory = torch.cuda.memory_allocated() / 1024**3 self.logger.log_info(f"📊 GPU memory after model replacement: {current_memory:.1f}GB") self.logger.log_info("🎯 Single model instance now used across all steps (1-5)!") except Exception as e: self.logger.log_warning(f"Model replacement failed, using default VeRL behavior: {e}") # 실패해도 VeRL이 자체 모델을 사용하므로 계속 진행 def _update_verl_trainer_data(self, training_data_path: str): """기존 VeRL trainer에서 데이터 경로만 업데이트""" try: self.logger.log_info("🔄 Updating VeRL trainer data for new round") # 데이터 경로 업데이트 new_train_files = [ os.path.join(training_data_path, "induction.parquet"), os.path.join(training_data_path, "deduction.parquet"), os.path.join(training_data_path, "abduction.parquet") ] # 존재하는 파일만 선택 valid_files = [f for f in new_train_files if os.path.exists(f)] if not valid_files: self.logger.log_warning("⚠️ No valid training files found for update") return # Config 업데이트 self.verl_config.data.train_files = valid_files self.verl_config.data.val_files = valid_files # Trainer의 데이터 로더 업데이트 if hasattr(self.verl_trainer, 'update_data_files'): self.verl_trainer.update_data_files(valid_files) else: # Trainer 내부의 config 업데이트 self.verl_trainer.config.data.train_files = valid_files self.verl_trainer.config.data.val_files = valid_files self.logger.log_info(f"✅ Updated training data: {len(valid_files)} files") # ⭐ 중요: 데이터가 변경되었으므로 worker를 재초기화해야 함 self.logger.log_info("🔧 Re-initializing VeRL workers with new data...") self.verl_trainer.init_workers() self.logger.log_info("✅ VeRL workers re-initialized with actual training data") except Exception as e: self.logger.log_error(f"Failed to update VeRL trainer data: {e}") import traceback traceback.print_exc() def _load_verl_config(self): """VeRL config 로드 - 기존 YAML 파일 사용""" try: # VeRL config 파일 경로 설정 (실행 모드에 따라 자동 선택) if self.verl_config_path: config_path = self.verl_config_path else: config_path = self._get_default_config_path() self.logger.log_info(f"🔧 Loading VeRL config from: {config_path}") self.logger.log_info(f"🔧 Config selected for {self.execution_mode} mode") from omegaconf import OmegaConf import os if not os.path.exists(config_path): raise FileNotFoundError(f"Config file not found: {config_path}") # YAML 파일 로드 self.verl_config = OmegaConf.load(config_path) # 모델 경로를 실제 HuggingFace 경로로 설정 if hasattr(self, 'current_model_path') and self.current_model_path: if self.current_model_path.startswith('memory://'): # 가상 경로인 경우 원본 모델 경로 사용 model_path_for_verl = self.original_model_name self.logger.log_info(f"🔧 Using original model path for VeRL: {model_path_for_verl}") else: model_path_for_verl = self.current_model_path self.verl_config.actor_rollout_ref.model.path = model_path_for_verl self.logger.log_info(f"🔧 Updated VeRL model path to: {model_path_for_verl}") self.logger.log_info("✅ VeRL config loaded successfully from YAML") self.logger.log_info(f" - TTRLVR Ray parallel processing: {self.verl_config.data.ttrlvr_ray_config.parallel_processing}") self.logger.log_info(f" - TTRLVR inference engine: {self.verl_config.data.ttrlvr_inference_engine}") except Exception as e: self.logger.log_error(f"Config loading failed: {e}") self.verl_config = None def _detect_available_gpus(self): """사용 가능한 GPU 리스트 감지""" import os cuda_visible_devices = os.environ.get('CUDA_VISIBLE_DEVICES', '0') if cuda_visible_devices: return [int(gpu.strip()) for gpu in cuda_visible_devices.split(',') if gpu.strip()] else: return [0] # 기본값 def _determine_execution_mode(self): """GPU 개수에 따른 실행 모드 결정""" num_gpus = len(self.available_gpus) if num_gpus == 1: return "single_gpu" # 전체 파이프라인 단일 GPU else: return "distributed" # 전체 파이프라인 분산 GPU def _get_default_config_path(self): """실행 모드에 따른 기본 config 파일 경로 반환""" base_path = "/home/ubuntu/RLVR/TestTime-RLVR-v2/test/configs" if self.execution_mode == "single_gpu": return f"{base_path}/ttrlvr_azr_ppo.yaml" # 단일 GPU config else: return f"{base_path}/ttrlvr_azr_ppo_4gpu.yaml" # 다중 GPU config def _should_save_checkpoint(self, round_num: int) -> bool: """체크포인트 저장 여부 결정""" if self.save_every_round: return True if round_num % self.save_round_interval == 0: return True return False def _convert_jsonl_to_ttrlvr_format(self, jsonl_path: str, output_dir: str): """VeRL의 JSONL 출력을 TTRLVR 형식의 개별 텍스트 파일로 변환""" import json try: with open(jsonl_path, 'r') as f: data = json.load(f) # 각 샘플에 대해 개별 파일 생성 for i in range(len(data.get('input', []))): prompt = data['input'][i] if 'input' in data else "" response = data['output'][i] if 'output' in data else "" score = data['score'][i] if 'score' in data else 0.0 # 프롬프트에서 task type 추출 (induction/deduction/abduction) task_type = "unknown" if "induction" in prompt.lower() or "input/output pairs" in prompt: task_type = "induction" elif "deduction" in prompt.lower() or "observed output" in prompt: task_type = "deduction" elif "abduction" in prompt.lower() or "which input produces" in prompt: task_type = "abduction" # task type별 서브디렉토리 생성 task_dir = os.path.join(output_dir, task_type) os.makedirs(task_dir, exist_ok=True) # 파일명 생성 filename = f"verl_training_{task_type}_{self.response_counter}_response.txt" filepath = os.path.join(task_dir, filename) # TTRLVR 형식으로 저장 with open(filepath, 'w') as f: f.write(f"Task Type: {task_type}\n") f.write(f"Task ID: verl_step_{data.get('step', 0)[i]}_{i}\n") f.write(f"Generated: {datetime.now().strftime('%Y%m%d_%H%M%S')}\n") f.write("="*80 + "\n") f.write("ORIGINAL PROMPT:\n") f.write("="*80 + "\n") f.write(prompt + "\n") f.write("="*80 + "\n") f.write("LLM RESPONSE:\n") f.write("="*80 + "\n") f.write(response + "\n") f.write("="*80 + "\n") f.write("REWARD SCORE:\n") f.write("="*80 + "\n") f.write(f"Score: {score:.3f}\n") # 추가 정보가 있으면 포함 for key in data.keys(): if key not in ['input', 'output', 'score', 'step'] and isinstance(data[key], list): f.write("="*80 + "\n") f.write(f"{key.upper()}:\n") f.write("="*80 + "\n") f.write(f"{data[key][i] if i < len(data[key]) else 'N/A'}\n") self.response_counter += 1 except Exception as e: self.logger.log_error(f"Failed to convert JSONL to TTRLVR format: {e}") def _save_round_checkpoint(self, round_num: int): """매 라운드마다 VeRL 체크포인트 저장""" try: if hasattr(self, 'verl_trainer') and self.verl_trainer: # VeRL trainer의 체크포인트 저장 메서드 호출 checkpoint_path = f"checkpoint_round_{round_num}" # VeRL trainer 설정에서 저장 경로 업데이트 original_dir = self.verl_trainer.config.trainer.default_local_dir round_checkpoint_dir = f"{original_dir}/{checkpoint_path}" # 임시로 저장 경로 변경 self.verl_trainer.config.trainer.default_local_dir = round_checkpoint_dir # 체크포인트 저장 self.verl_trainer._save_checkpoint() # 원래 경로 복원 self.verl_trainer.config.trainer.default_local_dir = original_dir self.logger.log_info(f"💾 Round {round_num} checkpoint saved to: {round_checkpoint_dir}") return round_checkpoint_dir else: self.logger.log_warning("⚠️ VeRL trainer not available for checkpoint saving") return None except Exception as e: self.logger.log_error(f"Failed to save round {round_num} checkpoint: {e}") import traceback traceback.print_exc() return None def _initialize_ray_cluster(self): """Ray 클러스터 초기화 (전체 세션에서 한 번만)""" try: import ray import os # Ray가 이미 초기화되어 있는지 확인 if ray.is_initialized(): self.logger.log_info("⚠️ Ray already initialized, using existing cluster") self.ray_initialized = True return self.logger.log_info("🚀 Initializing Ray cluster with all GPUs for shared usage") # GPU 설정 cuda_visible_devices = os.environ.get("CUDA_VISIBLE_DEVICES", "0") self.logger.log_info(f"🎯 Ray initialization with CUDA_VISIBLE_DEVICES: {cuda_visible_devices}") # GPU 개수 확인 available_gpus = cuda_visible_devices.split(',') if cuda_visible_devices else ['0'] self.logger.log_info(f"🎯 Available GPUs: {available_gpus} (count: {len(available_gpus)})") # VeRL config에서 Ray 설정 가져오기 ray_config = getattr(self.verl_config, 'ray_init', None) if hasattr(self, 'verl_config') and self.verl_config else None # Ray 초기화 - AZR 방식대로 GPU 개수를 명시하지 않음 # Ray가 GPU를 직접 관리하지 않고 CUDA_VISIBLE_DEVICES로 제어 ray.init( runtime_env={"env_vars": { "TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN", "VLLM_LOGGING_LEVEL": "WARN", "VERL_LOGGING_LEVEL": "INFO", # VeRL 로깅 레벨 설정 "VLLM_ALLOW_RUNTIME_LORA_UPDATING": "true", "CUDA_VISIBLE_DEVICES": cuda_visible_devices }}, num_cpus=ray_config.num_cpus if ray_config else 16, # AZR config와 동일 # num_gpus 설정하지 않음 - AZR 방식 ignore_reinit_error=True # 재초기화 에러 무시 ) self.ray_initialized = True self.logger.log_info("✅ Ray cluster initialized successfully") self.logger.log_info(f" - GPUs available via CUDA: {cuda_visible_devices}") self.logger.log_info(f" - CPUs: {ray_config.num_cpus if ray_config else 16}") self.logger.log_info(" - GPU management: CUDA_VISIBLE_DEVICES (not Ray)") self.logger.log_info(" - GPU sharing enabled: VLLM (GPU 0,1) + FSDP (GPU 0,1,2,3)") except Exception as e: self.logger.log_error(f"Failed to initialize Ray cluster: {e}") import traceback traceback.print_exc() raise def _process_problems_sequential(self, benchmark_config: BenchmarkConfig, problem_ids: List[str], round_num: int) -> Dict[str, Dict]: """순차적으로 문제들을 처리 (기존 방식)""" results = {} for i, problem_id in enumerate(problem_ids): self.logger.log_info(f"📝 Processing problem {i+1}/{len(problem_ids)}: {problem_id}") try: # Ray Actor 파이프라인 초기화 (필요시) if self.remote_pipeline is None: self._initialize_pipeline() # Ray Actor에서 완전한 파이프라인 실행 (원격 호출) pipeline_result = ray.get(self.remote_pipeline.run_complete_pipeline.remote( benchmark_config, problem_id, round_num, self.session_timestamp )) results[problem_id] = pipeline_result except Exception as e: self.logger.log_error(f"💥 Failed to process {problem_id}: {e}") results[problem_id] = { 'success': False, 'error': str(e) } return results def _process_problems_parallel(self, benchmark_config: BenchmarkConfig, problem_ids: List[str], round_num: int) -> Dict[str, Dict]: """[DEPRECATED] Ray를 사용한 병렬 문제 처리 - 현재 사용하지 않음 Note: 문제 간 병렬 처리는 비활성화됨. 단일 문제 내 VLLM 배치 처리만 사용. """ try: # VeRL config에서 TTRLVR Ray 설정 가져오기 ray_config = getattr(self.verl_config.data, 'ttrlvr_ray_config', {}) if hasattr(self, 'verl_config') and self.verl_config else {} # 병렬 처리 활성화 여부 확인 parallel_enabled = ray_config.get('parallel_processing', False) max_concurrent = ray_config.get('max_concurrent_problems', 4) if not parallel_enabled or len(problem_ids) <= 1: self.logger.log_info("📝 Using sequential processing (parallel_processing=False or single problem)") return self._process_problems_sequential(benchmark_config, problem_ids, round_num) # 실제 Ray 병렬 처리 구현 self.logger.log_info(f"🚀 Using Ray parallel processing for {len(problem_ids)} problems") self.logger.log_info(f" - Max concurrent: {min(max_concurrent, len(problem_ids))}") import ray # Ray Actor를 사용한 병렬 TTRLVR 파이프라인 처리 import os cuda_visible_devices = os.environ.get("CUDA_VISIBLE_DEVICES", "0") available_gpus = cuda_visible_devices.split(',') if cuda_visible_devices else ['0'] @ray.remote(num_gpus=1) class TTRLVRPipelineActor: def __init__(self, config, logger_config, gpu_id=0): # GPU 설정 먼저 import os import torch # 환경변수에서 실제 GPU 번호 가져오기 cuda_devices = os.environ.get('CUDA_VISIBLE_DEVICES', '0') available_gpus = cuda_devices.split(',') if cuda_devices else ['0'] actual_gpu = available_gpus[gpu_id % len(available_gpus)] # 현재 프로세스의 CUDA_VISIBLE_DEVICES 설정 os.environ['CUDA_VISIBLE_DEVICES'] = actual_gpu # CUDA 초기화 강제 if torch.cuda.is_available(): torch.cuda.set_device(0) # 로컬에서는 항상 0번 (실제로는 actual_gpu) print(f"🎯 Actor initialized on GPU {actual_gpu} (local:0)") # TTRLVR 파이프라인 초기화 from absolute_zero_reasoner.testtime.complete_pipeline import CompleteTestTimePipeline from absolute_zero_reasoner.testtime.logger import TestTimeLogger # 로거 재생성 (직렬화 문제 해결) logger = TestTimeLogger(log_dir=logger_config.get('log_dir', '/tmp')) # 모델 로드 (각 Actor마다 독립적으로) model, tokenizer = self._load_pipeline_model(config) self.pipeline = CompleteTestTimePipeline( model=model, tokenizer=tokenizer, config=config, logger=logger ) def _load_pipeline_model(self, config): """각 Actor에서 독립적으로 모델 로드""" from absolute_zero_reasoner.testtime.solution_generator import InitialSolutionGenerator import torch import os # 엔진 선택 (config 기본값 사용) use_vllm = getattr(config, 'use_vllm_for_data_generation', True) # GPU 설정 - 현재 Actor에 할당된 GPU 사용 device = 'cuda:0' # 로컬에서는 항상 0번 (실제 GPU는 CUDA_VISIBLE_DEVICES로 제어) print(f"🔄 Loading model on device {device} (actual GPU: {os.environ.get('CUDA_VISIBLE_DEVICES', 'unknown')})") return InitialSolutionGenerator.load_model_with_optimizations( config.model_name, device, config, use_vllm=use_vllm ) def process_problem(self, benchmark_config, problem_id, round_num, session_timestamp): """단일 문제 처리""" try: result = self.pipeline.run_complete_pipeline( benchmark_config, problem_id, round_num, session_timestamp ) return problem_id, result except Exception as e: return problem_id, { 'success': False, 'error': str(e) } # Actor 생성 (최대 동시 실행 수만큼, GPU 수 고려) num_actors = min(max_concurrent, len(problem_ids), len(available_gpus)) self.logger.log_info(f"🎭 Creating {num_actors} Ray actors across {len(available_gpus)} GPUs") self.logger.log_info(f" - Available GPUs: {available_gpus}") self.logger.log_info(f" - Debug: max_concurrent={max_concurrent}, len(problem_ids)={len(problem_ids)}, len(available_gpus)={len(available_gpus)}") # 로거 설정 직렬화 logger_config = { 'log_dir': self.logger.log_dir if hasattr(self.logger, 'log_dir') else '/tmp' } # GPU별로 Actor 생성 actors = [] for i in range(num_actors): gpu_id = i % len(available_gpus) self.logger.log_info(f" - Actor {i} -> GPU {available_gpus[gpu_id]}") actors.append(TTRLVRPipelineActor.remote(self.config, logger_config, gpu_id)) # 작업 분배 및 실행 futures = [] for i, problem_id in enumerate(problem_ids): actor_idx = i % num_actors future = actors[actor_idx].process_problem.remote( benchmark_config, problem_id, round_num, self.session_timestamp ) futures.append(future) # 결과 수집 self.logger.log_info(f"⏳ Waiting for {len(futures)} parallel tasks to complete...") results_list = ray.get(futures) # 결과 딕셔너리 생성 results = {} for problem_id, result in results_list: results[problem_id] = result self.logger.log_info(f"✅ Parallel processing completed: {len(results)} problems processed") return results except Exception as e: self.logger.log_error(f"💥 Parallel processing failed: {e}") self.logger.log_info("📝 Falling back to sequential processing") return self._process_problems_sequential(benchmark_config, problem_ids, round_num) def _find_actual_training_data(self) -> Optional[str]: """최근 생성된 실제 학습 데이터 디렉토리 찾기""" try: # tmp/batch_results에서 최근 생성된 디렉토리 검색 base_path = "/home/ubuntu/RLVR/TestTime-RLVR-v2/tmp/batch_results" # azr_training_data 폴더 검색 import glob search_pattern = os.path.join(base_path, "**/azr_training_data") data_dirs = glob.glob(search_pattern, recursive=True) if not data_dirs: return None # 가장 최근 수정된 디렉토리 찾기 latest_dir = max(data_dirs, key=os.path.getmtime) # parquet 파일이 실제로 있는지 확인 task_files = ['induction.parquet', 'deduction.parquet', 'abduction.parquet'] parquet_count = 0 for task_file in task_files: if os.path.exists(os.path.join(latest_dir, task_file)): parquet_count += 1 if parquet_count > 0: return latest_dir else: return None except Exception as e: self.logger.log_error(f"Error finding actual training data: {e}") return None