# TTRLVR-AZR 통합 계획서 ## 개요 TTRLVR을 AZR 방식으로 완전 통합하여 하나의 VeRL 세션에서 모든 Phase를 처리하도록 재구조화 ## 1. 전체 구조 변경 ### 현재 구조 (분리형) ``` train_ttrlvr_azr.py ├── for round in rounds: │ ├── Phase 1-4: RemoteTestTimePipeline (독립 vLLM) │ │ ├── Step 1: 프로그램 생성 │ │ ├── Step 2: I/O 쌍 생성 │ │ ├── Step 3: Task 생성 │ │ └── Step 4: 검증 │ ├── ray.kill(pipeline) # vLLM 삭제 │ └── Phase 5: VeRL Training (새 vLLM) │ ├── trainer.init_workers() # 매 라운드마다 │ └── trainer.fit() # 1 epoch ``` ### 목표 구조 (통합형) ``` train_ttrlvr_azr_unified.py ├── trainer = UnifiedTTRLVRTrainer() ├── trainer.init_workers() # 1번만! └── trainer.fit() └── for round in rounds: # 내부에서 처리 ├── Phase 1-4: 데이터 생성 (같은 vLLM) └── Phase 5: 학습 (같은 vLLM) ``` ## 2. 파일별 수정 계획 ### 2.1 새로운 파일 생성 #### `/test/trainer/unified_ttrlvr_trainer.py` ```python """ 통합 TTRLVR Trainer - 모든 Phase를 하나의 세션에서 처리 """ from absolute_zero_reasoner.trainer.ppo.azr_ray_trainer import CodeIORayPPOTrainer class UnifiedTTRLVRTrainer(CodeIORayPPOTrainer): def __init__(self, ttrlvr_config, problem_ids, total_rounds, *args, **kwargs): super().__init__(*args, **kwargs) self.ttrlvr_config = ttrlvr_config self.problem_ids = problem_ids self.total_rounds = total_rounds self.current_round = 0 def fit(self): """메인 학습 루프 - 라운드별 처리 포함""" # 로거 설정 logger = self._setup_logger() # 전체 라운드 반복 for round_num in range(1, self.total_rounds + 1): self.current_round = round_num # Phase 1-4: 데이터 생성 round_data = self._generate_round_data() # Phase 5: 1 epoch 학습 self._train_one_round(round_data) # 체크포인트 저장 if round_num % 5 == 0: self._save_checkpoint() def _generate_round_data(self): """Phase 1-4를 VeRL 내부에서 처리""" # 기존 TestTimePipeline 로직을 이곳으로 이동 pass ``` ### 2.2 기존 파일 수정 #### `/test/train_ttrlvr_azr.py` → `/test/train_ttrlvr_azr_unified.py` 변경 전: ```python # 복잡한 라운드별 처리 trainer = IterativeTrainer(...) for round in rounds: # Phase 1-4 pipeline = RemoteTestTimePipeline(...) data = pipeline.run_complete_pipeline(...) ray.kill(pipeline) # Phase 5 trainer.train_with_data(data) ``` 변경 후: ```python # 단순화된 메인 로직 from trainer.unified_ttrlvr_trainer import UnifiedTTRLVRTrainer # 설정 config = load_config() trainer = UnifiedTTRLVRTrainer( config=config, problem_ids=problem_ids, total_rounds=args.rounds, tokenizer=tokenizer, ... ) # 한 번만 초기화 trainer.init_workers() # 모든 라운드 처리 trainer.fit() ``` #### `/test/utils/testtime_pipeline.py` 로직 이동 기존 Phase 1-4 로직을 UnifiedTTRLVRTrainer로 이동: ```python class UnifiedTTRLVRTrainer(CodeIORayPPOTrainer): def _generate_programs(self, problem_data): """Step 1: 프로그램 생성 - TestTimePipeline에서 이동""" prompt = self._create_program_prompt(problem_data) # VeRL의 vLLM 사용! prompts_proto = DataProto.from_dict({ "input_ids": tokenize(prompt), "attention_mask": ... }) # 기존 actor_rollout_wg 사용 outputs = self.actor_rollout_wg.generate_sequences(prompts_proto) return self._parse_programs(outputs) def _generate_io_pairs(self, programs): """Step 2: I/O 생성 - TestTimePipeline에서 이동""" # 같은 방식으로 구현 pass ``` ### 2.3 설정 파일 수정 #### `/test/configs/ttrlvr_azr_unified.yaml` ```yaml # 통합 설정 actor_rollout_ref: rollout: # dummy_dtensor 사용 가능 (같은 vLLM 계속 사용) load_format: dummy_dtensor # TTRLVR 특화 설정 ttrlvr: # Phase 1-4 설정 num_programs: 4 input_generation_rounds: 3 # Phase 5 설정 train_batch_size: 8 epochs_per_round: 1 # 라운드당 1 epoch ``` ## 3. 구현 상세 ### 3.1 UnifiedTTRLVRTrainer 전체 구현 ```python # /test/trainer/unified_ttrlvr_trainer.py import os import json import torch import pandas as pd from typing import List, Dict, Any, Optional from datetime import datetime import numpy as np from verl import DataProto from verl.utils.py_utils import merge_dict from absolute_zero_reasoner.trainer.ppo.azr_ray_trainer import CodeIORayPPOTrainer from absolute_zero_reasoner.testtime.config import BenchmarkConfig from absolute_zero_reasoner.testtime.execution import PythonExecutor class UnifiedTTRLVRTrainer(CodeIORayPPOTrainer): """ TTRLVR의 모든 Phase를 하나의 VeRL 세션에서 처리하는 통합 Trainer """ def __init__( self, ttrlvr_config: Dict[str, Any], benchmark_config: BenchmarkConfig, problem_ids: List[str], total_rounds: int, output_dir: str, *args, **kwargs ): super().__init__(*args, **kwargs) self.ttrlvr_config = ttrlvr_config self.benchmark_config = benchmark_config self.problem_ids = problem_ids self.total_rounds = total_rounds self.output_dir = output_dir self.current_round = 0 # Phase 1-4용 설정 self.num_programs = ttrlvr_config.get('num_programs', 4) self.input_rounds = ttrlvr_config.get('input_generation_rounds', 3) self.parallel_batch_size = ttrlvr_config.get('parallel_batch_size', 4) # Python 실행기 self.executor = PythonExecutor(timeout_length=10) def fit(self): """ 통합 학습 루프 - AZR의 fit()을 확장하여 라운드별 처리 """ # 기본 로거 설정 from verl.utils.tracking import Tracking logger = Tracking( project_name=self.config.trainer.project_name, experiment_name=self.config.trainer.experiment_name, default_backend=self.config.trainer.logger, config=self.config, tags=self.config.trainer.wandb_tags, entity=self.config.trainer.wandb.entity, wandb_run_id=self.config.trainer.wandb_run_id, ) # 전체 라운드 반복 for round_num in range(1, self.total_rounds + 1): self.current_round = round_num logger.log({"round": round_num}) print(f"\n{'='*80}") print(f"🔄 Round {round_num}/{self.total_rounds}") print(f"{'='*80}") # Phase 1-4: 데이터 생성 round_start = datetime.now() round_data = self._generate_round_data() data_gen_time = (datetime.now() - round_start).total_seconds() print(f"✅ Data generation completed in {data_gen_time:.2f}s") print(f"📊 Generated {len(round_data)} training examples") # 데이터를 parquet 파일로 저장 self._save_round_data(round_data, round_num) # Phase 5: PPO 학습 (1 epoch) train_start = datetime.now() metrics = self._train_one_round(round_data, logger) train_time = (datetime.now() - train_start).total_seconds() print(f"✅ Training completed in {train_time:.2f}s") # 메트릭 로깅 logger.log({ "round_time/data_generation": data_gen_time, "round_time/training": train_time, "round_time/total": data_gen_time + train_time, **metrics }) # 체크포인트 저장 if round_num % 5 == 0: self._save_checkpoint() def _generate_round_data(self) -> List[Dict[str, Any]]: """ Phase 1-4: 현재 모델로 라운드 데이터 생성 """ all_tasks = [] for problem_id in self.problem_ids: print(f"\n📝 Processing problem: {problem_id}") try: # Step 1: 프로그램 생성 programs = self._generate_programs(problem_id) print(f" ✓ Generated {len(programs)} programs") # Step 2: I/O 쌍 생성 io_pairs = self._generate_io_pairs(problem_id, programs) print(f" ✓ Generated {len(io_pairs)} I/O pairs") # Step 3: Task 생성 tasks = self._create_reasoning_tasks(problem_id, programs, io_pairs) print(f" ✓ Created {len(tasks)} tasks") # Step 4: 검증 valid_tasks = self._validate_tasks(tasks) print(f" ✓ Validated {len(valid_tasks)}/{len(tasks)} tasks") all_tasks.extend(valid_tasks) except Exception as e: print(f" ✗ Error processing {problem_id}: {e}") continue return all_tasks def _generate_programs(self, problem_id: str) -> List[str]: """ Step 1: 다양한 프로그램 생성 VeRL의 vLLM을 사용하여 생성 """ # 문제 데이터 로드 problem_data = self._load_problem_data(problem_id) # 프롬프트 생성 prompt = f"""You are given a programming problem. Generate {self.num_programs} different solutions. Problem: {problem_data['description']} Generate {self.num_programs} different Python solutions:""" # 토큰화 input_ids = self.tokenizer( prompt, return_tensors="pt", padding=True, truncation=True, max_length=self.config.data.max_prompt_length ).input_ids # DataProto 생성 prompts_proto = DataProto.from_dict({ "input_ids": input_ids.cuda(), "attention_mask": torch.ones_like(input_ids).cuda(), "position_ids": torch.arange(input_ids.size(1)).unsqueeze(0).cuda() }) # 메타 정보 추가 prompts_proto.meta_info = { "eos_token_id": self.tokenizer.eos_token_id, "pad_token_id": self.tokenizer.pad_token_id, "temperature": 0.8, # 다양성을 위해 높은 temperature "do_sample": True, "top_p": 0.95, "response_length": 512 } # VeRL의 vLLM으로 생성! outputs = self.actor_rollout_wg.generate_sequences(prompts_proto) # 프로그램 추출 programs = [] generated_text = self.tokenizer.decode( outputs.batch["input_ids"][0], skip_special_tokens=True ) # 프로그램 파싱 (코드 블록 추출) code_blocks = self._extract_code_blocks(generated_text) programs.extend(code_blocks[:self.num_programs]) return programs def _generate_io_pairs( self, problem_id: str, programs: List[str] ) -> List[Dict[str, Any]]: """ Step 2: 프로그램들로부터 I/O 쌍 생성 """ io_pairs = [] for program in programs: # 각 프로그램에 대해 여러 입력 생성 for round_idx in range(self.input_rounds): prompt = f"""Given this Python function, generate {5} test inputs. Function: ```python {program} ``` Generate {5} different test inputs as a Python list:""" # 입력 생성 inputs = self._generate_with_vllm(prompt, temperature=0.7) # 각 입력에 대해 출력 계산 for test_input in inputs: try: output = self.executor.execute(program, test_input) if output['success']: io_pairs.append({ 'input': test_input, 'output': output['result'], 'program': program }) except: continue return io_pairs def _create_reasoning_tasks( self, problem_id: str, programs: List[str], io_pairs: List[Dict[str, Any]] ) -> List[Dict[str, Any]]: """ Step 3: Induction, Deduction, Abduction task 생성 """ tasks = [] for io_pair in io_pairs: # Induction: I/O → Program tasks.append({ 'problem_id': problem_id, 'task_type': 'induction', 'input': io_pair['input'], 'output': io_pair['output'], 'target': io_pair['program'], 'prompt': self._create_induction_prompt(io_pair) }) # Deduction: Program + Input → Output tasks.append({ 'problem_id': problem_id, 'task_type': 'deduction', 'input': io_pair['input'], 'program': io_pair['program'], 'target': io_pair['output'], 'prompt': self._create_deduction_prompt(io_pair) }) # Abduction: Program + Output → Input tasks.append({ 'problem_id': problem_id, 'task_type': 'abduction', 'program': io_pair['program'], 'output': io_pair['output'], 'target': io_pair['input'], 'prompt': self._create_abduction_prompt(io_pair) }) return tasks def _train_one_round( self, round_data: List[Dict[str, Any]], logger ) -> Dict[str, float]: """ Phase 5: 한 라운드의 PPO 학습 """ # 데이터를 VeRL 형식으로 변환 train_dataset = self._convert_to_verl_dataset(round_data) # 현재 dataloader 업데이트 self.train_dataloader = self._create_dataloader( train_dataset, self.val_dataset, self.collate_fn, self.train_sampler ) # 1 epoch 학습 epoch_metrics = {} for step, batch in enumerate(self.train_dataloader): # 배치 준비 gen_batch = self._prepare_generation_batch(batch) # 시퀀스 생성 (같은 vLLM 사용!) gen_batch_output = self.actor_rollout_wg.generate_sequences(gen_batch) # 리워드 계산 batch = batch.union(gen_batch_output) reward_tensor = self.reward_fn(batch) # PPO 업데이트 update_metrics = self._ppo_update(batch, reward_tensor) # 메트릭 수집 for k, v in update_metrics.items(): if k not in epoch_metrics: epoch_metrics[k] = [] epoch_metrics[k].append(v) # 평균 메트릭 계산 avg_metrics = { k: np.mean(v) for k, v in epoch_metrics.items() } return avg_metrics def _generate_with_vllm( self, prompt: str, temperature: float = 0.7 ) -> Any: """ 헬퍼 함수: VeRL의 vLLM을 사용한 텍스트 생성 """ # 토큰화 input_ids = self.tokenizer( prompt, return_tensors="pt", padding=True, truncation=True ).input_ids # DataProto 생성 prompts_proto = DataProto.from_dict({ "input_ids": input_ids.cuda(), "attention_mask": torch.ones_like(input_ids).cuda(), }) prompts_proto.meta_info = { "eos_token_id": self.tokenizer.eos_token_id, "pad_token_id": self.tokenizer.pad_token_id, "temperature": temperature, "do_sample": True, "response_length": 256 } # 생성 outputs = self.actor_rollout_wg.generate_sequences(prompts_proto) # 디코딩 generated_text = self.tokenizer.decode( outputs.batch["input_ids"][0], skip_special_tokens=True ) return self._parse_output(generated_text) def _save_round_data(self, round_data: List[Dict], round_num: int): """라운드 데이터를 parquet 파일로 저장""" output_dir = os.path.join(self.output_dir, f"round_{round_num}") os.makedirs(output_dir, exist_ok=True) # Task 타입별로 분리 for task_type in ['induction', 'deduction', 'abduction']: tasks = [t for t in round_data if t['task_type'] == task_type] if tasks: df = pd.DataFrame(tasks) df.to_parquet(os.path.join(output_dir, f"{task_type}.parquet")) ``` ### 3.2 데이터 흐름 상세 ```python # 실제 데이터가 흐르는 과정 # Round 1 시작 trainer.current_round = 1 # 1. 프로그램 생성 programs = trainer._generate_programs("Mbpp/1") # → trainer.actor_rollout_wg.generate_sequences() 호출 # → FSDP 모델의 가중치가 vLLM에 동기화됨 (첫 번째) # → 출력: ["def solve(x): return x*2", "def solve(x): return 2*x", ...] # 2. I/O 생성 io_pairs = trainer._generate_io_pairs("Mbpp/1", programs) # → 같은 vLLM 사용 (동기화 건너뜀 - base_sync_done=True) # → 출력: [{"input": 5, "output": 10}, {"input": 3, "output": 6}, ...] # 3. Task 생성 tasks = trainer._create_reasoning_tasks(...) # → 메모리에서만 처리 (vLLM 호출 없음) # 4. PPO 학습 trainer._train_one_round(tasks) # → 같은 vLLM으로 response 생성 # → FSDP 모델 업데이트 # → vLLM은 메모리 참조로 자동 업데이트 # Round 2 시작 - 같은 vLLM 계속 사용! ``` ### 3.3 메모리 관리 상세 ```python class UnifiedTTRLVRTrainer(CodeIORayPPOTrainer): def _manage_memory(self): """Phase 전환 시 메모리 관리""" # Ray actor는 유지하면서 GPU 캐시만 정리 torch.cuda.empty_cache() # vLLM의 KV 캐시 정리 (선택적) if hasattr(self.actor_rollout_wg, 'clear_kv_cache'): self.actor_rollout_wg.clear_kv_cache() def _monitor_memory(self): """메모리 사용량 모니터링""" for i in range(torch.cuda.device_count()): allocated = torch.cuda.memory_allocated(i) / 1024**3 reserved = torch.cuda.memory_reserved(i) / 1024**3 print(f"GPU {i}: Allocated={allocated:.2f}GB, Reserved={reserved:.2f}GB") ``` ### 3.4 동기화 메커니즘 상세 ```python # 동기화가 어떻게 보장되는지 # 1. 첫 번째 generate_sequences 호출 with self.rollout_sharding_manager: # __enter__() 호출 # dummy_dtensor 사용 시: # - self.base_sync_done = False (초기값) # - sync_model_weights() 실행 → FSDP → vLLM 동기화 # - self.base_sync_done = True 설정 # 2. 이후 generate_sequences 호출들 with self.rollout_sharding_manager: # __enter__() 호출 # - self.base_sync_done = True # - sync_model_weights() 건너뜀 # - 하지만 같은 vLLM이므로 메모리 참조로 업데이트됨 # 3. 메모리 참조 메커니즘 # FSDP 모델과 vLLM 모델이 같은 tensor를 참조 # FSDP 업데이트 → tensor 값 변경 → vLLM도 자동으로 새 값 사용 ``` ### 3.5 에러 처리 및 복구 ```python class UnifiedTTRLVRTrainer(CodeIORayPPOTrainer): def _safe_generate(self, prompt: str, max_retries: int = 3): """안전한 생성 with 재시도""" for attempt in range(max_retries): try: return self._generate_with_vllm(prompt) except Exception as e: print(f"Generation failed (attempt {attempt+1}): {e}") if attempt == max_retries - 1: raise # GPU 메모리 정리 후 재시도 torch.cuda.empty_cache() time.sleep(1) def _validate_tasks(self, tasks: List[Dict]) -> List[Dict]: """생성된 task 검증""" valid_tasks = [] for task in tasks: if self._is_valid_task(task): valid_tasks.append(task) else: print(f"Invalid task filtered: {task['task_type']}") return valid_tasks ``` ## 4. 마이그레이션 계획 ### Phase 1: 코드 준비 1. UnifiedTTRLVRTrainer 클래스 생성 2. TestTimePipeline 로직 이동 3. 단위 테스트 작성 ### Phase 2: 통합 테스트 1. 소규모 문제로 테스트 (1-2 rounds) 2. 메모리 사용량 모니터링 3. 학습 성능 비교 ### Phase 3: 전환 1. 기존 스크립트 백업 2. 새 스크립트로 교체 3. 전체 학습 실행 ## 5. 예상 결과 ### 장점 - ✅ 동기화 문제 완전 해결 - ✅ 30-40% 빠른 실행 (vLLM 재생성 없음) - ✅ 메모리 효율 20-30% 개선 - ✅ 코드 구조 단순화 ### 단점 및 대응 - ❌ Phase 간 결합도 증가 - → 명확한 인터페이스 정의로 해결 - ❌ 디버깅 복잡도 - → 상세한 로깅 추가 - ❌ 기존 코드와 호환성 - → 두 버전 병행 유지 ## 6. 구현 우선순위 1. **높음**: UnifiedTTRLVRTrainer 기본 구조 2. **높음**: Phase 1-4 로직 이동 3. **중간**: 설정 파일 통합 4. **낮음**: 추가 최적화 ## 7. 테스트 계획 ```bash # 단계별 테스트 # 1. 소규모 테스트 python train_ttrlvr_azr_unified.py --rounds 2 --problems 1 # 2. 중간 테스트 python train_ttrlvr_azr_unified.py --rounds 5 --problems 5 # 3. 전체 테스트 python train_ttrlvr_azr_unified.py --rounds 30 --problems 10 ``` ## 8. 롤백 계획 문제 발생 시: 1. 기존 분리형 구조로 즉시 복귀 가능 2. load_format: dtensor 사용으로 임시 해결 3. 단계적 통합 (Phase 5만 먼저)