|
|
|
|
|
""" |
|
|
TTRLVR + AZR ๋ฐ๋ณต ํ์ต ํธ๋ ์ด๋ |
|
|
|
|
|
30๋ผ์ด๋ ๋ฐ๋ณต ํ์ต์ ๊ด๋ฆฌํ๋ฉฐ, ๊ฐ ๋ผ์ด๋๋ง๋ค: |
|
|
1. TTRLVR ํ์ดํ๋ผ์ธ์ผ๋ก (i,p,o) โ tasks ์์ฑ |
|
|
2. ํด๋น ๋ผ์ด๋ ๋ฐ์ดํฐ๋ก ์ค์ AZR์ CodeIORayPPOTrainer ํ์ต |
|
|
3. ๊ฐ์ ๋ ๋ชจ๋ธ๋ก ๋ค์ ๋ผ์ด๋ ์งํ |
|
|
""" |
|
|
|
|
|
import os |
|
|
import sys |
|
|
import json |
|
|
import pandas as pd |
|
|
import ray |
|
|
from pathlib import Path |
|
|
from datetime import datetime |
|
|
from typing import Dict, List, Any, Optional |
|
|
|
|
|
|
|
|
sys.path.append('/home/ubuntu/RLVR/TestTime-RLVR-v2') |
|
|
from absolute_zero_reasoner.testtime.complete_pipeline import CompleteTestTimePipeline |
|
|
from absolute_zero_reasoner.testtime.config import TestTimeConfig, BenchmarkConfig |
|
|
from absolute_zero_reasoner.testtime.logger import TestTimeLogger |
|
|
|
|
|
|
|
|
from utils.checkpoint_manager import CheckpointManager |
|
|
from absolute_zero_reasoner.trainer.ppo.azr_ray_trainer import CodeIORayPPOTrainer |
|
|
from utils.custom_ray_trainer import CustomCodeIORayPPOTrainer |
|
|
import hydra |
|
|
from hydra.core.global_hydra import GlobalHydra |
|
|
|
|
|
|
|
|
class IterativeTrainer: |
|
|
"""TTRLVR + AZR ๋ฐ๋ณต ํ์ต ๊ด๋ฆฌ์""" |
|
|
|
|
|
def __init__(self, config: TestTimeConfig, logger: Optional[TestTimeLogger] = None, batch_epochs: int = 1, verl_config_path: str = None, save_every_round: bool = False, save_round_interval: int = 5): |
|
|
self.config = config |
|
|
self.logger = logger or TestTimeLogger() |
|
|
self.batch_epochs = batch_epochs |
|
|
self.verl_config_path = verl_config_path |
|
|
self.save_every_round = save_every_round |
|
|
self.save_round_interval = save_round_interval |
|
|
|
|
|
|
|
|
self.available_gpus = self._detect_available_gpus() |
|
|
self.execution_mode = self._determine_execution_mode() |
|
|
self.logger.log_info(f"๐ฏ Detected {len(self.available_gpus)} GPUs: {self.available_gpus}") |
|
|
self.logger.log_info(f"๐ฏ Execution mode: {self.execution_mode}") |
|
|
|
|
|
|
|
|
self.complete_pipeline = None |
|
|
|
|
|
|
|
|
self.checkpoint_manager = CheckpointManager(logger=self.logger) |
|
|
|
|
|
|
|
|
self.original_model_name = config.model_name |
|
|
self.current_model_path = config.model_name |
|
|
self.current_model = None |
|
|
self.round_results = {} |
|
|
self.checkpoint_dir = "/data/RLVR/checkpoints/ttrlvr_azr" |
|
|
|
|
|
|
|
|
self.remote_pipeline = None |
|
|
|
|
|
|
|
|
self.verl_trainer = None |
|
|
self.verl_config = None |
|
|
self.ray_initialized = False |
|
|
|
|
|
|
|
|
self.start_time = None |
|
|
self.round_times = {} |
|
|
|
|
|
def cleanup(self): |
|
|
"""Ray ํด๋ฌ์คํฐ ๋ฐ ๊ด๋ จ ๋ฆฌ์์ค ์ ๋ฆฌ""" |
|
|
try: |
|
|
self.logger.log_info("๐งน Starting cleanup process...") |
|
|
|
|
|
|
|
|
if hasattr(self, 'verl_trainer') and self.verl_trainer is not None: |
|
|
try: |
|
|
self.logger.log_info(" - Cleaning up VeRL trainer...") |
|
|
|
|
|
if hasattr(self.verl_trainer, 'shutdown'): |
|
|
self.verl_trainer.shutdown() |
|
|
self.verl_trainer = None |
|
|
except Exception as e: |
|
|
self.logger.log_warning(f" - VeRL trainer cleanup warning: {e}") |
|
|
|
|
|
|
|
|
if self.remote_pipeline is not None: |
|
|
try: |
|
|
self.logger.log_info(" - Killing remote pipeline actor...") |
|
|
ray.kill(self.remote_pipeline) |
|
|
except: |
|
|
pass |
|
|
self.remote_pipeline = None |
|
|
|
|
|
|
|
|
if self.ray_initialized and ray.is_initialized(): |
|
|
self.logger.log_info(" - Shutting down Ray cluster...") |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
actors = ray.util.list_named_actors() |
|
|
if actors: |
|
|
self.logger.log_info(f" - Found {len(actors)} named actors to kill") |
|
|
for actor in actors: |
|
|
try: |
|
|
ray.kill(ray.get_actor(actor['name'])) |
|
|
except: |
|
|
pass |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
ray.shutdown() |
|
|
self.ray_initialized = False |
|
|
|
|
|
|
|
|
import time |
|
|
time.sleep(2) |
|
|
|
|
|
self.logger.log_info("โ
Ray cluster shutdown complete") |
|
|
|
|
|
|
|
|
try: |
|
|
import torch |
|
|
if torch.cuda.is_available(): |
|
|
torch.cuda.empty_cache() |
|
|
self.logger.log_info(" - GPU memory cleared") |
|
|
except: |
|
|
pass |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.log_error(f"Error during cleanup: {e}") |
|
|
|
|
|
try: |
|
|
ray.shutdown() |
|
|
except: |
|
|
pass |
|
|
|
|
|
def run_iterative_training(self, benchmark_config: BenchmarkConfig, |
|
|
problem_ids: List[str], |
|
|
total_rounds: int = 30, |
|
|
resume_from_round: int = 1) -> Dict[str, Any]: |
|
|
"""30๋ผ์ด๋ ๋ฐ๋ณต ํ์ต ๋ฉ์ธ ๋ฃจํ""" |
|
|
|
|
|
self.start_time = datetime.now() |
|
|
|
|
|
self.session_timestamp = self.start_time.strftime('%Y%m%d_%H%M%S') |
|
|
|
|
|
self.logger.log_info(f"๐ Starting TTRLVR + AZR iterative training") |
|
|
self.logger.log_info(f"๐ Configuration: {len(problem_ids)} problems, {total_rounds} rounds") |
|
|
self.logger.log_info(f"๐ฏ Problems: {problem_ids}") |
|
|
self.logger.log_info(f"๐ Session timestamp: {self.session_timestamp}") |
|
|
|
|
|
|
|
|
if resume_from_round > 1: |
|
|
self.logger.log_info(f"๐ Resuming from round {resume_from_round}") |
|
|
checkpoint_model = self._load_checkpoint(resume_from_round - 1) |
|
|
if checkpoint_model: |
|
|
self.current_model_path = checkpoint_model |
|
|
|
|
|
training_results = { |
|
|
'start_time': self.start_time.isoformat(), |
|
|
'session_timestamp': self.session_timestamp, |
|
|
'benchmark': benchmark_config.name, |
|
|
'problem_ids': problem_ids, |
|
|
'total_rounds': total_rounds, |
|
|
'resume_from_round': resume_from_round, |
|
|
'rounds': {}, |
|
|
'success': False, |
|
|
'error': None |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
for round_num in range(resume_from_round, total_rounds + 1): |
|
|
round_start_time = datetime.now() |
|
|
self.logger.log_info(f"" + "="*80) |
|
|
self.logger.log_info(f"๐ ROUND {round_num}/{total_rounds} - Starting") |
|
|
self.logger.log_info(f"๐ค Current model: {self.current_model_path}") |
|
|
self.logger.log_info(f"" + "="*80) |
|
|
|
|
|
|
|
|
round_result = self._run_single_round( |
|
|
benchmark_config, problem_ids, round_num |
|
|
) |
|
|
|
|
|
|
|
|
round_end_time = datetime.now() |
|
|
round_duration = (round_end_time - round_start_time).total_seconds() |
|
|
self.round_times[round_num] = round_duration |
|
|
|
|
|
round_result['duration_seconds'] = round_duration |
|
|
round_result['model_before'] = self.current_model_path |
|
|
training_results['rounds'][round_num] = round_result |
|
|
|
|
|
if not round_result['success']: |
|
|
self.logger.log_error(f"โ Round {round_num} failed: {round_result.get('error', 'Unknown error')}") |
|
|
continue |
|
|
|
|
|
|
|
|
if round_result['training_data_files']: |
|
|
self.logger.log_info(f"๐ Starting AZR training for round {round_num}") |
|
|
new_model_path = self._train_azr_with_round_data( |
|
|
round_result['training_data_files'], round_num |
|
|
) |
|
|
|
|
|
if new_model_path: |
|
|
self.current_model_path = new_model_path |
|
|
round_result['model_after'] = new_model_path |
|
|
self.logger.log_info(f"โ
Round {round_num} completed successfully") |
|
|
self.logger.log_info(f"๐ฏ New model: {new_model_path}") |
|
|
|
|
|
|
|
|
if hasattr(self, 'remote_pipeline') and self.remote_pipeline is not None: |
|
|
self.logger.log_info("๐ Updating VLLM Ray Actor weights with trained model") |
|
|
import ray |
|
|
update_success = ray.get(self.remote_pipeline.update_model_weights.remote(new_model_path)) |
|
|
if update_success: |
|
|
self.logger.log_info("โ
VLLM weights updated successfully for next round") |
|
|
else: |
|
|
self.logger.log_warning("โ ๏ธ Failed to update VLLM weights, using old model") |
|
|
else: |
|
|
self.logger.log_error(f"โ AZR training failed for round {round_num}") |
|
|
round_result['training_error'] = "AZR training failed" |
|
|
|
|
|
|
|
|
if round_num % 5 == 0: |
|
|
self._save_checkpoint(round_num, self.current_model_path, training_results) |
|
|
self.logger.log_info(f"๐พ Checkpoint saved for round {round_num}") |
|
|
|
|
|
|
|
|
self._log_round_summary(round_num, round_result, round_duration) |
|
|
|
|
|
|
|
|
training_results['success'] = True |
|
|
training_results['end_time'] = datetime.now().isoformat() |
|
|
training_results['total_duration_seconds'] = (datetime.now() - self.start_time).total_seconds() |
|
|
training_results['final_model'] = self.current_model_path |
|
|
|
|
|
self.logger.log_info(f"๐ TTRLVR + AZR iterative training completed successfully!") |
|
|
|
|
|
|
|
|
if hasattr(self, 'verl_trainer') and self.verl_trainer is not None: |
|
|
self.logger.log_info("๐งน Cleaning up VeRL Trainer...") |
|
|
try: |
|
|
|
|
|
if hasattr(self.verl_trainer, 'cleanup'): |
|
|
self.verl_trainer.cleanup() |
|
|
self.verl_trainer = None |
|
|
except Exception as cleanup_error: |
|
|
self.logger.log_warning(f"Cleanup warning: {cleanup_error}") |
|
|
|
|
|
self._log_final_summary(training_results) |
|
|
|
|
|
return training_results |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.log_error(f"๐ฅ Iterative training failed: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return { |
|
|
'success': False, |
|
|
'error': str(e), |
|
|
'rounds': self.round_results |
|
|
} |
|
|
|
|
|
def run_verl_training_only(self, training_data_path: str, round_num: int = 1, |
|
|
experiment_name: Optional[str] = None) -> Dict[str, Any]: |
|
|
""" |
|
|
VeRL training(5๋จ๊ณ)๋ง ๋ณ๋๋ก ์คํ |
|
|
1-4๋จ๊ณ์์ ์์ฑ๋ ๋ฐ์ดํฐ๋ก VeRL PPO ํ์ต๋ง ์ํ |
|
|
|
|
|
Args: |
|
|
training_data_path: TTRLVR์์ ์์ฑ๋ ํ์ต ๋ฐ์ดํฐ ๊ฒฝ๋ก (parquet ํ์ผ๋ค) |
|
|
round_num: ๋ผ์ด๋ ๋ฒํธ (๋ก๊ทธ์ฉ) |
|
|
experiment_name: ์คํ ์ด๋ฆ (์ ํ์ฌํญ) |
|
|
|
|
|
Returns: |
|
|
ํ์ต ๊ฒฐ๊ณผ ๋์
๋๋ฆฌ |
|
|
""" |
|
|
try: |
|
|
self.logger.log_info("๐ Starting VeRL training ONLY (Step 5)") |
|
|
self.logger.log_info("="*80) |
|
|
self.logger.log_info(f"๐ Training data path: {training_data_path}") |
|
|
self.logger.log_info(f"๐ Round: {round_num}") |
|
|
|
|
|
|
|
|
if not hasattr(self, 'verl_config') or self.verl_config is None: |
|
|
self.logger.log_info("๐ง Loading VeRL config for standalone training") |
|
|
self._load_verl_config() |
|
|
|
|
|
|
|
|
if not os.path.exists(training_data_path): |
|
|
raise FileNotFoundError(f"Training data path not found: {training_data_path}") |
|
|
|
|
|
|
|
|
parquet_files = list(Path(training_data_path).glob("*.parquet")) |
|
|
if not parquet_files: |
|
|
raise FileNotFoundError(f"No parquet files found in: {training_data_path}") |
|
|
|
|
|
|
|
|
self.verl_config.data.train_files = [str(f) for f in parquet_files] |
|
|
self.verl_config.data.val_files = [str(f) for f in parquet_files[:1]] |
|
|
|
|
|
self.logger.log_info(f"๐ Found {len(parquet_files)} training files") |
|
|
for i, f in enumerate(parquet_files): |
|
|
self.logger.log_info(f" {i+1}. {f.name}") |
|
|
|
|
|
|
|
|
if not hasattr(self, 'verl_trainer') or self.verl_trainer is None: |
|
|
self.logger.log_info("๐ Initializing VeRL trainer with actual training data") |
|
|
|
|
|
|
|
|
import torch |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
|
|
|
self._initialize_verl_trainer(training_data_path) |
|
|
self.logger.log_info(f"โ
FSDP model loaded on GPU {self.available_gpus}") |
|
|
self.logger.log_info("โ
GPU sharing enabled: VLLM + FSDP on same GPUs") |
|
|
else: |
|
|
|
|
|
self.logger.log_info("๐ Updating existing VeRL trainer with new data files") |
|
|
|
|
|
self.verl_trainer.config.data.train_files = self.verl_config.data.train_files |
|
|
self.verl_trainer.config.data.val_files = self.verl_config.data.val_files |
|
|
|
|
|
|
|
|
self.logger.log_info("๐ง Re-initializing VeRL workers with new data...") |
|
|
self.verl_trainer.init_workers() |
|
|
self.logger.log_info("โ
VeRL workers re-initialized with actual training data") |
|
|
|
|
|
|
|
|
if experiment_name: |
|
|
self.verl_config.experiment.name = experiment_name |
|
|
else: |
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
self.verl_config.experiment.name = f"verl_only_round_{round_num}_{timestamp}" |
|
|
|
|
|
self.logger.log_info(f"๐ท๏ธ Experiment: {self.verl_config.experiment.name}") |
|
|
|
|
|
|
|
|
self.logger.log_info("๐ Starting VeRL PPO training...") |
|
|
start_time = datetime.now() |
|
|
|
|
|
|
|
|
try: |
|
|
if hasattr(self, 'verl_trainer') and self.verl_trainer is not None: |
|
|
|
|
|
llm_responses_dir = os.path.join(os.path.dirname(training_data_path), "llm_responses") |
|
|
os.makedirs(llm_responses_dir, exist_ok=True) |
|
|
self.logger.log_info(f"๐ LLM responses will be saved to: {llm_responses_dir}") |
|
|
|
|
|
|
|
|
self.verl_trainer.config.trainer.rollout_data_dir = llm_responses_dir |
|
|
|
|
|
|
|
|
self.llm_responses_dir = llm_responses_dir |
|
|
self.response_counter = 0 |
|
|
|
|
|
self.logger.log_info("๐ฏ Running VeRL PPO training...") |
|
|
self.verl_trainer.fit() |
|
|
|
|
|
training_result = {'success': True, 'model_path': self.current_model_path} |
|
|
self.logger.log_info("โ
VeRL training completed successfully") |
|
|
|
|
|
|
|
|
if hasattr(self, 'llm_responses_dir') and os.path.exists(self.llm_responses_dir): |
|
|
self.logger.log_info("๐ Converting VeRL outputs to TTRLVR format...") |
|
|
jsonl_files = list(Path(self.llm_responses_dir).glob("*.jsonl")) |
|
|
for jsonl_file in jsonl_files: |
|
|
self._convert_jsonl_to_ttrlvr_format(str(jsonl_file), self.llm_responses_dir) |
|
|
self.logger.log_info(f"โ
Converted {len(jsonl_files)} JSONL files to TTRLVR format") |
|
|
else: |
|
|
raise ValueError("VeRL trainer not initialized") |
|
|
except Exception as e: |
|
|
self.logger.log_error(f"VeRL training failed: {e}") |
|
|
training_result = {'success': False, 'error': str(e)} |
|
|
|
|
|
end_time = datetime.now() |
|
|
duration = (end_time - start_time).total_seconds() |
|
|
|
|
|
self.logger.log_info(f"โฑ๏ธ VeRL training completed in {duration:.1f} seconds") |
|
|
|
|
|
|
|
|
result = { |
|
|
'success': training_result.get('success', False), |
|
|
'round': round_num, |
|
|
'experiment_name': self.verl_config.experiment.name, |
|
|
'training_data_path': training_data_path, |
|
|
'duration_seconds': duration, |
|
|
'start_time': start_time.isoformat(), |
|
|
'end_time': end_time.isoformat(), |
|
|
'training_files': len(parquet_files), |
|
|
'model_path': getattr(training_result, 'model_path', self.current_model_path), |
|
|
'details': training_result |
|
|
} |
|
|
|
|
|
if result['success']: |
|
|
self.logger.log_info("๐ VeRL training completed successfully!") |
|
|
if 'model_path' in training_result: |
|
|
self.current_model_path = training_result['model_path'] |
|
|
self.logger.log_info(f"๐ค Updated model path: {self.current_model_path}") |
|
|
else: |
|
|
self.logger.log_error("โ VeRL training failed") |
|
|
self.logger.log_error(f"Error: {training_result.get('error', 'Unknown error')}") |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.log_error(f"๐ฅ VeRL-only training failed: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return { |
|
|
'success': False, |
|
|
'error': str(e), |
|
|
'round': round_num, |
|
|
'training_data_path': training_data_path |
|
|
} |
|
|
|
|
|
def _process_single_round(self, benchmark_config: BenchmarkConfig, |
|
|
problem_ids: List[str], round_num: int) -> Dict[str, Any]: |
|
|
"""๋จ์ผ ๋ผ์ด๋ ์ฒ๋ฆฌ""" |
|
|
|
|
|
round_start_time = datetime.now() |
|
|
self.logger.log_info(f"๐ ROUND {round_num} - Starting") |
|
|
|
|
|
try: |
|
|
|
|
|
return self._run_single_round(benchmark_config, problem_ids, round_num) |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.log_error(f"Round {round_num} failed: {e}") |
|
|
return { |
|
|
'success': False, |
|
|
'error': str(e), |
|
|
'round': round_num, |
|
|
'duration_seconds': (datetime.now() - round_start_time).total_seconds() |
|
|
} |
|
|
|
|
|
def _run_single_round(self, benchmark_config: BenchmarkConfig, |
|
|
problem_ids: List[str], round_num: int) -> Dict[str, Any]: |
|
|
"""๋จ์ผ ๋ผ์ด๋ ์คํ - Ray๋ฅผ ํ์ฉํ ๋ณ๋ ฌ TTRLVR ํ์ดํ๋ผ์ธ ์คํ""" |
|
|
|
|
|
round_result = { |
|
|
'round_num': round_num, |
|
|
'problems': {}, |
|
|
'training_data_files': [], |
|
|
'success': False, |
|
|
'error': None, |
|
|
'stats': { |
|
|
'total_problems': len(problem_ids), |
|
|
'successful_problems': 0, |
|
|
'failed_problems': 0, |
|
|
'total_tasks': 0, |
|
|
'tasks_by_type': {'induction': 0, 'deduction': 0, 'abduction': 0} |
|
|
} |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
if not hasattr(self, 'verl_config') or self.verl_config is None: |
|
|
self.logger.log_info("๐ง Loading VeRL config for Ray settings") |
|
|
self._load_verl_config() |
|
|
|
|
|
|
|
|
if not self.ray_initialized: |
|
|
self.logger.log_info("๐ Initializing Ray for data generation") |
|
|
self._initialize_ray_cluster() |
|
|
|
|
|
|
|
|
|
|
|
if round_num == 1: |
|
|
self.logger.log_info("๐ VeRL trainer will be initialized in Step 5 with actual data") |
|
|
self.logger.log_info("๐ง GPU sharing plan: VLLM (GPU 1,2) + FSDP (GPU 0,1,2,3)") |
|
|
|
|
|
|
|
|
self._update_pipeline_model(self.current_model_path) |
|
|
|
|
|
successful_problems = 0 |
|
|
|
|
|
|
|
|
|
|
|
self.logger.log_info(f"๐ Using sequential processing for {len(problem_ids)} problems") |
|
|
self.logger.log_info(" - Multi-problem parallelization disabled") |
|
|
self.logger.log_info(" - Single-problem VLLM batch processing enabled") |
|
|
results = self._process_problems_sequential(benchmark_config, problem_ids, round_num) |
|
|
|
|
|
|
|
|
for problem_id, pipeline_result in results.items(): |
|
|
round_result['problems'][problem_id] = pipeline_result |
|
|
|
|
|
if pipeline_result['success']: |
|
|
successful_problems += 1 |
|
|
|
|
|
|
|
|
if 'azr_training_data' in pipeline_result: |
|
|
round_result['training_data_files'].append({ |
|
|
'problem_id': problem_id, |
|
|
'files': pipeline_result['azr_training_data'] |
|
|
}) |
|
|
|
|
|
|
|
|
if 'azr_data_saving' in pipeline_result['steps']: |
|
|
total_tasks = pipeline_result['steps']['azr_data_saving']['total_tasks'] |
|
|
round_result['stats']['total_tasks'] += total_tasks |
|
|
|
|
|
self.logger.log_info(f"โ
{problem_id} completed successfully") |
|
|
else: |
|
|
self.logger.log_error(f"โ {problem_id} failed: {pipeline_result.get('error', 'Unknown error')}") |
|
|
|
|
|
|
|
|
round_result['stats']['successful_problems'] = successful_problems |
|
|
round_result['stats']['failed_problems'] = len(problem_ids) - successful_problems |
|
|
round_result['success'] = successful_problems > 0 |
|
|
|
|
|
if successful_problems == 0: |
|
|
round_result['error'] = "No problems completed successfully" |
|
|
|
|
|
return round_result |
|
|
|
|
|
except Exception as e: |
|
|
round_result['error'] = str(e) |
|
|
return round_result |
|
|
|
|
|
def _initialize_pipeline(self): |
|
|
"""Ray Actor๋ก ํ์ดํ๋ผ์ธ ์ด๊ธฐํ (VeRL ํจํด)""" |
|
|
|
|
|
if self.remote_pipeline is None: |
|
|
try: |
|
|
|
|
|
ttrlvr_config = self.config |
|
|
|
|
|
|
|
|
|
|
|
if hasattr(self, 'verl_config') and self.verl_config and hasattr(self.verl_config, 'actor_rollout_ref'): |
|
|
rollout_name = self.verl_config.actor_rollout_ref.rollout.name |
|
|
|
|
|
use_vllm = (rollout_name == "vllm") |
|
|
else: |
|
|
|
|
|
use_vllm = (self.execution_mode == "distributed") |
|
|
|
|
|
ttrlvr_config.use_vllm_for_data_generation = use_vllm |
|
|
engine_name = "vllm" if use_vllm else "huggingface" |
|
|
self.logger.log_info(f"๐ง TTRLVR data generation using: {engine_name} (execution_mode: {self.execution_mode})") |
|
|
|
|
|
|
|
|
self.logger.log_info(f"๐ Config debug: num_program_variations = {ttrlvr_config.num_program_variations}") |
|
|
self.logger.log_info(f"๐ Config debug: skip_task_evaluation = {getattr(ttrlvr_config, 'skip_task_evaluation', False)}") |
|
|
|
|
|
|
|
|
from absolute_zero_reasoner.testtime.complete_pipeline import RemoteTestTimePipeline |
|
|
gpu_count = len(self.available_gpus) |
|
|
self.logger.log_info(f"๐ Creating RemoteTestTimePipeline with {gpu_count} GPUs, model: {self.current_model_path}") |
|
|
|
|
|
|
|
|
import os |
|
|
if hasattr(self.logger, 'log_file_path') and self.logger.log_file_path: |
|
|
runtime_env = { |
|
|
"env_vars": { |
|
|
"TTRLVR_LOG_FILE": self.logger.log_file_path, |
|
|
"CUDA_VISIBLE_DEVICES": os.environ.get("CUDA_VISIBLE_DEVICES", "0") |
|
|
} |
|
|
} |
|
|
self.logger.log_info(f"๐ง Setting TTRLVR_LOG_FILE for Ray worker: {self.logger.log_file_path}") |
|
|
else: |
|
|
runtime_env = { |
|
|
"env_vars": { |
|
|
"CUDA_VISIBLE_DEVICES": os.environ.get("CUDA_VISIBLE_DEVICES", "0") |
|
|
} |
|
|
} |
|
|
self.logger.log_warning("โ ๏ธ Logger does not have log_file_path attribute, Ray worker will create its own log file") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
gpu_count = len(self.available_gpus) |
|
|
|
|
|
|
|
|
if gpu_count >= 2: |
|
|
|
|
|
cuda_devices = os.environ.get("CUDA_VISIBLE_DEVICES", "0,1,2,3").split(',') |
|
|
vllm_gpus = f"{cuda_devices[0]},{cuda_devices[1]}" |
|
|
else: |
|
|
vllm_gpus = os.environ.get("CUDA_VISIBLE_DEVICES", "0") |
|
|
|
|
|
|
|
|
|
|
|
runtime_env["env_vars"]["CUDA_VISIBLE_DEVICES"] = str(vllm_gpus) |
|
|
runtime_env["env_vars"]["VLLM_USE_SPECIFIC_GPUS"] = str(vllm_gpus) |
|
|
|
|
|
self.logger.log_info(f"๐ฏ Creating Ray Actor without exclusive GPU allocation") |
|
|
self.logger.log_info(f" - VLLM will use GPUs: {vllm_gpus} (via CUDA_VISIBLE_DEVICES)") |
|
|
self.logger.log_info(f" - FSDP can use all GPUs: {os.environ.get('CUDA_VISIBLE_DEVICES', '0,1,2,3')}") |
|
|
|
|
|
|
|
|
|
|
|
RemoteTestTimePipelineWithGPUs = RemoteTestTimePipeline.options( |
|
|
num_cpus=4, |
|
|
|
|
|
runtime_env=runtime_env |
|
|
) |
|
|
self.remote_pipeline = RemoteTestTimePipelineWithGPUs.remote( |
|
|
config=ttrlvr_config, |
|
|
model_path=self.current_model_path |
|
|
) |
|
|
|
|
|
self.logger.log_info(f"๐ RemoteTestTimePipeline initialized in {self.execution_mode} mode") |
|
|
if self.execution_mode == "distributed": |
|
|
self.logger.log_info(f" - Using VLLM distributed inference on GPU 0,1") |
|
|
self.logger.log_info(f" - FSDP can use all GPUs: 0,1,2,3 with sharing") |
|
|
self.logger.log_info(f" - Model loading handled inside Ray worker") |
|
|
else: |
|
|
self.logger.log_info(f" - Using HuggingFace single GPU inference inside Ray worker") |
|
|
except Exception as e: |
|
|
self.logger.log_error(f"Failed to initialize pipeline: {e}") |
|
|
raise |
|
|
|
|
|
|
|
|
def _update_pipeline_model(self, model_path: str): |
|
|
"""ํ์ดํ๋ผ์ธ์ ๋ชจ๋ธ ๋ ํผ๋ฐ์ค ์
๋ฐ์ดํธ (๋ฉ๋ชจ๋ฆฌ ๋ด ๋์ผ ๋ชจ๋ธ ์ ์ง)""" |
|
|
|
|
|
try: |
|
|
|
|
|
if self.remote_pipeline is None: |
|
|
self._initialize_pipeline() |
|
|
|
|
|
|
|
|
self.current_model_path = model_path |
|
|
self.logger.log_info(f"๐ Pipeline model path updated to: {model_path}") |
|
|
|
|
|
|
|
|
if hasattr(self, '_last_model_path') and self._last_model_path != model_path: |
|
|
self.logger.log_info("๐ Model path changed, recreating Ray Actor") |
|
|
self.remote_pipeline = None |
|
|
self._initialize_pipeline() |
|
|
|
|
|
self._last_model_path = model_path |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.log_warning(f"Failed to update pipeline model: {e}") |
|
|
|
|
|
def _train_azr_with_round_data(self, training_data_files: List[Dict[str, Any]], |
|
|
round_num: int) -> Optional[str]: |
|
|
"""ํด๋น ๋ผ์ด๋ ๋ฐ์ดํฐ๋ก ๋ฉ๋ชจ๋ฆฌ ๋ด ๋ชจ๋ธ ์
๋ฐ์ดํธ""" |
|
|
|
|
|
try: |
|
|
|
|
|
combined_data_path = self._combine_round_data(training_data_files, round_num) |
|
|
if not combined_data_path: |
|
|
self.logger.log_error(f"Failed to combine training data for round {round_num}") |
|
|
return None |
|
|
|
|
|
self.logger.log_info(f"๐ Combined training data: {combined_data_path}") |
|
|
|
|
|
|
|
|
updated_model = self._update_model_in_memory(combined_data_path, round_num) |
|
|
|
|
|
if updated_model: |
|
|
|
|
|
self.current_model = updated_model |
|
|
|
|
|
|
|
|
if self.complete_pipeline: |
|
|
self.complete_pipeline.model = self.current_model |
|
|
if hasattr(self.complete_pipeline, 'solution_generator') and self.complete_pipeline.solution_generator: |
|
|
self.complete_pipeline.solution_generator.model = self.current_model |
|
|
if hasattr(self.complete_pipeline, 'ipo_extractor') and self.complete_pipeline.ipo_extractor: |
|
|
self.complete_pipeline.ipo_extractor.model = self.current_model |
|
|
|
|
|
|
|
|
virtual_model_path = f"memory://round_{round_num}_model" |
|
|
self.logger.log_info(f"โ
Model updated in memory for round {round_num}") |
|
|
self.logger.log_info(f"๐ฏ Virtual model path: {virtual_model_path}") |
|
|
return virtual_model_path |
|
|
else: |
|
|
self.logger.log_error(f"โ Model update failed for round {round_num}") |
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.log_error(f"๐ฅ Model update execution failed: {e}") |
|
|
return None |
|
|
|
|
|
def _update_model_in_memory(self, training_data_path: str, round_num: int) -> Optional[Any]: |
|
|
"""๋ฉ๋ชจ๋ฆฌ ๋ด์์ VeRL์ ์ฌ์ฉํ์ฌ ๋ชจ๋ธ์ ์ง์ ์
๋ฐ์ดํธ (AZR REINFORCE++ ํ์ต)""" |
|
|
|
|
|
try: |
|
|
self.logger.log_info(f"๐ Starting VeRL-based AZR training for round {round_num}") |
|
|
self.logger.log_info(f"๐ Training data: {training_data_path}") |
|
|
|
|
|
|
|
|
task_files = ['induction.parquet', 'deduction.parquet', 'abduction.parquet'] |
|
|
available_files = [] |
|
|
|
|
|
for task_file in task_files: |
|
|
file_path = os.path.join(training_data_path, task_file) |
|
|
if os.path.exists(file_path): |
|
|
available_files.append(task_file) |
|
|
|
|
|
file_size = os.path.getsize(file_path) |
|
|
self.logger.log_info(f" ๐ {task_file}: {file_size} bytes") |
|
|
|
|
|
if not available_files: |
|
|
self.logger.log_warning("โ ๏ธ No training data files found in specified directory") |
|
|
|
|
|
self.logger.log_info("๐ Searching for actual training data...") |
|
|
actual_data_path = self._find_actual_training_data() |
|
|
if actual_data_path: |
|
|
self.logger.log_info(f"โ
Found actual training data: {actual_data_path}") |
|
|
training_data_path = actual_data_path |
|
|
|
|
|
for task_file in task_files: |
|
|
file_path = os.path.join(training_data_path, task_file) |
|
|
if os.path.exists(file_path): |
|
|
available_files.append(task_file) |
|
|
file_size = os.path.getsize(file_path) |
|
|
self.logger.log_info(f" ๐ {task_file}: {file_size} bytes") |
|
|
else: |
|
|
self.logger.log_error("โ No actual training data found anywhere") |
|
|
return None |
|
|
|
|
|
self.logger.log_info(f"๐ Processing {len(available_files)} task types") |
|
|
|
|
|
|
|
|
if hasattr(self, 'remote_pipeline') and self.remote_pipeline is not None: |
|
|
self.logger.log_info("๐งน Releasing VLLM Ray Actor memory before Step 5...") |
|
|
try: |
|
|
|
|
|
cleanup_result = ray.get(self.remote_pipeline.cleanup.remote()) |
|
|
if cleanup_result: |
|
|
self.logger.log_info("โ
VLLM internal resources cleaned up") |
|
|
|
|
|
|
|
|
ray.kill(self.remote_pipeline) |
|
|
self.remote_pipeline = None |
|
|
self.logger.log_info("โ
VLLM Ray Actor killed successfully") |
|
|
|
|
|
|
|
|
import torch |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
|
|
|
import time |
|
|
time.sleep(2) |
|
|
|
|
|
|
|
|
if torch.cuda.is_available(): |
|
|
for i in range(torch.cuda.device_count()): |
|
|
memory_allocated = torch.cuda.memory_allocated(i) / 1024**3 |
|
|
memory_reserved = torch.cuda.memory_reserved(i) / 1024**3 |
|
|
self.logger.log_info(f" GPU {i}: Allocated={memory_allocated:.2f}GB, Reserved={memory_reserved:.2f}GB") |
|
|
except Exception as e: |
|
|
self.logger.log_warning(f"โ ๏ธ Error during VLLM cleanup: {e}") |
|
|
|
|
|
|
|
|
if self.verl_trainer is None: |
|
|
self._initialize_verl_trainer(training_data_path) |
|
|
else: |
|
|
|
|
|
self._update_verl_trainer_data(training_data_path) |
|
|
|
|
|
if self.verl_trainer is None: |
|
|
self.logger.log_error("Failed to initialize VeRL trainer") |
|
|
return self.current_model |
|
|
|
|
|
|
|
|
self.logger.log_info(f"๐ Starting in-memory VeRL training for round {round_num}") |
|
|
|
|
|
|
|
|
llm_responses_dir = os.path.join(os.path.dirname(training_data_path), "llm_responses") |
|
|
os.makedirs(llm_responses_dir, exist_ok=True) |
|
|
self.logger.log_info(f"๐ LLM responses will be saved to: {llm_responses_dir}") |
|
|
|
|
|
|
|
|
self.verl_config.trainer.rollout_data_dir = llm_responses_dir |
|
|
|
|
|
|
|
|
if hasattr(self, 'batch_epochs') and self.batch_epochs > 1: |
|
|
original_epochs = self.verl_config.trainer.total_epochs |
|
|
self.verl_config.trainer.total_epochs = self.batch_epochs |
|
|
self.logger.log_info(f"๐ง Adjusted epochs from {original_epochs} to {self.batch_epochs}") |
|
|
|
|
|
|
|
|
train_batch_size = self.verl_config.data.train_batch_size |
|
|
problem_types = getattr(self.verl_config.azr, 'problem_types', ['code_i', 'code_o', 'code_f']) |
|
|
train_propose = getattr(self.verl_config.azr, 'train_propose', False) |
|
|
|
|
|
|
|
|
original_ppo_mini_batch_size = self.verl_config.actor_rollout_ref.actor.ppo_mini_batch_size |
|
|
|
|
|
|
|
|
calculated_ppo_mini_batch_size = train_batch_size * len(problem_types) * (2 if train_propose else 1) |
|
|
self.verl_config.actor_rollout_ref.actor.ppo_mini_batch_size = calculated_ppo_mini_batch_size |
|
|
|
|
|
|
|
|
update_iteration = getattr(self.verl_config.azr.data_selection_strategy, 'update_iteration', 1) |
|
|
self.verl_config.azr.data_selection_strategy.data_len = train_batch_size * update_iteration |
|
|
|
|
|
self.logger.log_info(f"๐ง Auto-calculated ppo_mini_batch_size: {original_ppo_mini_batch_size} โ {calculated_ppo_mini_batch_size}") |
|
|
self.logger.log_info(f" - train_batch_size: {train_batch_size}") |
|
|
self.logger.log_info(f" - problem_types: {len(problem_types)} ({problem_types})") |
|
|
self.logger.log_info(f" - train_propose: {train_propose}") |
|
|
self.logger.log_info(f"๐ง Auto-calculated data_len: {self.verl_config.azr.data_selection_strategy.data_len}") |
|
|
|
|
|
|
|
|
self.logger.log_info(f"๐ Calling verl_trainer.fit() for round {round_num}") |
|
|
self.logger.log_info(f"๐ Config - total_epochs: {self.verl_config.trainer.total_epochs}") |
|
|
self.logger.log_info(f"๐ Config - train_batch_size: {self.verl_config.data.train_batch_size}") |
|
|
self.logger.log_info(f"๐ Config - total_training_steps: {self.verl_config.trainer.total_training_steps}") |
|
|
|
|
|
|
|
|
if hasattr(self.verl_trainer, 'config'): |
|
|
self.verl_trainer.config.trainer.rollout_data_dir = llm_responses_dir |
|
|
|
|
|
|
|
|
fit_start = datetime.now() |
|
|
self.verl_trainer.fit() |
|
|
fit_end = datetime.now() |
|
|
fit_duration = (fit_end - fit_start).total_seconds() |
|
|
|
|
|
self.logger.log_info(f"โฑ๏ธ verl_trainer.fit() completed in {fit_duration:.3f} seconds") |
|
|
|
|
|
|
|
|
if os.path.exists(llm_responses_dir): |
|
|
self.logger.log_info("๐ Converting VeRL outputs to TTRLVR format...") |
|
|
jsonl_files = list(Path(llm_responses_dir).glob("*.jsonl")) |
|
|
for jsonl_file in jsonl_files: |
|
|
self._convert_jsonl_to_ttrlvr_format(str(jsonl_file), llm_responses_dir) |
|
|
self.logger.log_info(f"โ
Converted {len(jsonl_files)} JSONL files to TTRLVR format") |
|
|
|
|
|
|
|
|
|
|
|
self.logger.log_info(f"โ
Model updated successfully with REINFORCE++ for round {round_num}") |
|
|
|
|
|
|
|
|
if self._should_save_checkpoint(round_num): |
|
|
self._save_round_checkpoint(round_num) |
|
|
|
|
|
|
|
|
|
|
|
if self.current_model is None: |
|
|
self.current_model = "verl_trained_model" |
|
|
return self.current_model |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.log_error(f"Failed to update model in memory: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return None |
|
|
|
|
|
def _combine_round_data(self, training_data_files: List[Dict[str, Any]], |
|
|
round_num: int) -> Optional[str]: |
|
|
"""๋ผ์ด๋์ ๋ชจ๋ ๋ฌธ์ ๋ฐ์ดํฐ๋ฅผ task๋ณ๋ก ํตํฉ""" |
|
|
|
|
|
try: |
|
|
|
|
|
output_dir = f"/tmp/ttrlvr_azr_training/round_{round_num}" |
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
combined_data = { |
|
|
'induction': [], |
|
|
'deduction': [], |
|
|
'abduction': [] |
|
|
} |
|
|
|
|
|
total_files_processed = 0 |
|
|
|
|
|
|
|
|
for problem_data in training_data_files: |
|
|
problem_id = problem_data['problem_id'] |
|
|
files = problem_data['files'] |
|
|
|
|
|
self.logger.log_info(f"๐ Processing data for {problem_id}") |
|
|
|
|
|
|
|
|
for task_type in ['induction', 'deduction', 'abduction']: |
|
|
if task_type in files: |
|
|
file_path = files[task_type] |
|
|
|
|
|
if os.path.exists(file_path): |
|
|
try: |
|
|
df = pd.read_parquet(file_path) |
|
|
task_data = df.to_dict('records') |
|
|
combined_data[task_type].extend(task_data) |
|
|
total_files_processed += 1 |
|
|
|
|
|
self.logger.log_info(f" โ
{task_type}: {len(task_data)} tasks from {file_path}") |
|
|
except Exception as e: |
|
|
self.logger.log_warning(f" โ ๏ธ Failed to read {file_path}: {e}") |
|
|
else: |
|
|
self.logger.log_warning(f" โ ๏ธ File not found: {file_path}") |
|
|
|
|
|
if total_files_processed == 0: |
|
|
self.logger.log_error("No training data files found to combine") |
|
|
return None |
|
|
|
|
|
|
|
|
combined_files = {} |
|
|
total_tasks = 0 |
|
|
|
|
|
for task_type, data in combined_data.items(): |
|
|
if data: |
|
|
|
|
|
df = pd.DataFrame(data) |
|
|
df = df.sort_values('ipo_group_id') |
|
|
|
|
|
|
|
|
file_path = os.path.join(output_dir, f"{task_type}.parquet") |
|
|
df.to_parquet(file_path, index=False) |
|
|
|
|
|
combined_files[task_type] = file_path |
|
|
total_tasks += len(data) |
|
|
|
|
|
self.logger.log_info(f"๐พ Saved {len(data)} {task_type} tasks to {file_path}") |
|
|
else: |
|
|
self.logger.log_warning(f"No {task_type} tasks found for round {round_num}") |
|
|
|
|
|
|
|
|
stats = { |
|
|
'round': round_num, |
|
|
'total_tasks': total_tasks, |
|
|
'tasks_by_type': {k: len(v) for k, v in combined_data.items()}, |
|
|
'files': combined_files, |
|
|
'problems_processed': len(training_data_files), |
|
|
'batch_groups': len(set( |
|
|
task['ipo_group_id'] |
|
|
for task_data in combined_data.values() |
|
|
for task in task_data |
|
|
)) |
|
|
} |
|
|
|
|
|
stats_file = os.path.join(output_dir, 'round_training_stats.json') |
|
|
with open(stats_file, 'w') as f: |
|
|
json.dump(stats, f, indent=2) |
|
|
|
|
|
self.logger.log_info(f"๐ Round {round_num} data summary:") |
|
|
self.logger.log_info(f" - Total tasks: {total_tasks}") |
|
|
self.logger.log_info(f" - Batch groups: {stats['batch_groups']}") |
|
|
self.logger.log_info(f" - Files: {list(combined_files.keys())}") |
|
|
|
|
|
return output_dir |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.log_error(f"Failed to combine round data: {e}") |
|
|
return None |
|
|
|
|
|
def _save_checkpoint(self, round_num: int, model_path: str, |
|
|
training_results: Dict[str, Any]): |
|
|
"""์ฒดํฌํฌ์ธํธ ์ ์ฅ (๋ชจ๋ธ ์ํ, ํ์ต ํต๊ณ, ๋ผ์ด๋ ์ ๋ณด)""" |
|
|
|
|
|
try: |
|
|
checkpoint_path = os.path.join(self.checkpoint_dir, f"checkpoint_round_{round_num}") |
|
|
os.makedirs(checkpoint_path, exist_ok=True) |
|
|
|
|
|
|
|
|
checkpoint_data = { |
|
|
'round_num': round_num, |
|
|
'model_path': model_path, |
|
|
'timestamp': datetime.now().isoformat(), |
|
|
'total_rounds': training_results.get('total_rounds', 30), |
|
|
'completed_rounds': round_num, |
|
|
'training_results': training_results, |
|
|
'round_times': self.round_times |
|
|
} |
|
|
|
|
|
|
|
|
checkpoint_file = os.path.join(checkpoint_path, 'checkpoint.json') |
|
|
with open(checkpoint_file, 'w') as f: |
|
|
json.dump(checkpoint_data, f, indent=2) |
|
|
|
|
|
|
|
|
summary_file = os.path.join(checkpoint_path, 'summary.txt') |
|
|
with open(summary_file, 'w') as f: |
|
|
f.write(f"TTRLVR + AZR Training Checkpoint - Round {round_num}\n") |
|
|
f.write("=" * 60 + "\n\n") |
|
|
f.write(f"Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") |
|
|
f.write(f"Completed Rounds: {round_num}/{training_results.get('total_rounds', 30)}\n") |
|
|
f.write(f"Current Model: {model_path}\n") |
|
|
f.write(f"Total Training Time: {sum(self.round_times.values()):.1f} seconds\n\n") |
|
|
|
|
|
|
|
|
f.write("Round Statistics:\n") |
|
|
f.write("-" * 20 + "\n") |
|
|
for r_num, r_time in self.round_times.items(): |
|
|
if r_num <= round_num: |
|
|
round_result = training_results['rounds'].get(r_num, {}) |
|
|
success = "โ
" if round_result.get('success', False) else "โ" |
|
|
f.write(f"Round {r_num:2d}: {success} ({r_time:.1f}s)\n") |
|
|
|
|
|
self.logger.log_info(f"๐พ Checkpoint saved: {checkpoint_path}") |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.log_error(f"Failed to save checkpoint: {e}") |
|
|
|
|
|
def _load_checkpoint(self, round_num: int) -> Optional[str]: |
|
|
"""์ฒดํฌํฌ์ธํธ์์ ๋ชจ๋ธ ๋ก๋""" |
|
|
|
|
|
try: |
|
|
checkpoint_path = os.path.join(self.checkpoint_dir, f"checkpoint_round_{round_num}") |
|
|
checkpoint_file = os.path.join(checkpoint_path, 'checkpoint.json') |
|
|
|
|
|
if not os.path.exists(checkpoint_file): |
|
|
self.logger.log_warning(f"Checkpoint not found: {checkpoint_file}") |
|
|
return None |
|
|
|
|
|
with open(checkpoint_file, 'r') as f: |
|
|
checkpoint_data = json.load(f) |
|
|
|
|
|
model_path = checkpoint_data.get('model_path') |
|
|
if model_path and os.path.exists(model_path): |
|
|
self.logger.log_info(f"๐ Loaded checkpoint from round {round_num}") |
|
|
self.logger.log_info(f"๐ค Model: {model_path}") |
|
|
|
|
|
|
|
|
if 'round_times' in checkpoint_data: |
|
|
self.round_times.update(checkpoint_data['round_times']) |
|
|
|
|
|
return model_path |
|
|
else: |
|
|
self.logger.log_warning(f"Model path in checkpoint does not exist: {model_path}") |
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.log_error(f"Failed to load checkpoint: {e}") |
|
|
return None |
|
|
|
|
|
def _log_round_summary(self, round_num: int, round_result: Dict[str, Any], |
|
|
duration: float): |
|
|
"""๋ผ์ด๋ ์๋ฃ ์์ฝ ๋ก๊ทธ""" |
|
|
|
|
|
stats = round_result.get('stats', {}) |
|
|
|
|
|
self.logger.log_info(f"") |
|
|
self.logger.log_info(f"๐ ROUND {round_num} SUMMARY") |
|
|
self.logger.log_info(f"" + "="*50) |
|
|
self.logger.log_info(f"โฑ๏ธ Duration: {duration:.1f} seconds") |
|
|
self.logger.log_info(f"๐ Problems: {stats.get('successful_problems', 0)}/{stats.get('total_problems', 0)} successful") |
|
|
self.logger.log_info(f"๐ฏ Total tasks: {stats.get('total_tasks', 0)}") |
|
|
|
|
|
tasks_by_type = stats.get('tasks_by_type', {}) |
|
|
for task_type, count in tasks_by_type.items(): |
|
|
if count > 0: |
|
|
self.logger.log_info(f" - {task_type}: {count}") |
|
|
|
|
|
if round_result.get('success'): |
|
|
self.logger.log_info(f"โ
Round {round_num} completed successfully") |
|
|
else: |
|
|
self.logger.log_info(f"โ Round {round_num} failed") |
|
|
|
|
|
self.logger.log_info(f"") |
|
|
|
|
|
def _log_final_summary(self, training_results: Dict[str, Any]): |
|
|
"""์ ์ฒด ํ์ต ์๋ฃ ์์ฝ ๋ก๊ทธ""" |
|
|
|
|
|
total_duration = training_results.get('total_duration_seconds', 0) |
|
|
total_rounds = training_results.get('total_rounds', 0) |
|
|
completed_rounds = len(training_results.get('rounds', {})) |
|
|
|
|
|
self.logger.log_info(f"") |
|
|
self.logger.log_info(f"๐ TTRLVR + AZR TRAINING COMPLETED") |
|
|
self.logger.log_info(f"" + "="*60) |
|
|
self.logger.log_info(f"โฑ๏ธ Total Duration: {total_duration:.1f} seconds ({total_duration/3600:.1f} hours)") |
|
|
self.logger.log_info(f"๐ Completed Rounds: {completed_rounds}/{total_rounds}") |
|
|
self.logger.log_info(f"๐ค Final Model: {training_results.get('final_model', 'N/A')}") |
|
|
|
|
|
|
|
|
successful_rounds = 0 |
|
|
failed_rounds = 0 |
|
|
|
|
|
for round_result in training_results['rounds'].values(): |
|
|
if round_result.get('success'): |
|
|
successful_rounds += 1 |
|
|
else: |
|
|
failed_rounds += 1 |
|
|
|
|
|
self.logger.log_info(f"๐ Round Statistics:") |
|
|
self.logger.log_info(f" - Successful: {successful_rounds}") |
|
|
self.logger.log_info(f" - Failed: {failed_rounds}") |
|
|
self.logger.log_info(f" - Success Rate: {successful_rounds/completed_rounds*100:.1f}%") |
|
|
|
|
|
|
|
|
if self.round_times: |
|
|
avg_round_time = sum(self.round_times.values()) / len(self.round_times) |
|
|
self.logger.log_info(f"โ Average Round Time: {avg_round_time:.1f} seconds") |
|
|
|
|
|
self.logger.log_info(f"") |
|
|
self.logger.log_info(f"๐พ All results saved to: {self.checkpoint_dir}") |
|
|
self.logger.log_info(f"๐ฏ Training completed successfully!") |
|
|
self.logger.log_info(f"") |
|
|
|
|
|
def _initialize_verl_trainer(self, training_data_path: str): |
|
|
"""์ฒซ ๋ฒ์งธ ๋ผ์ด๋์์ VeRL trainer ๋ฐ Ray ํด๋ฌ์คํฐ ์ด๊ธฐํ""" |
|
|
|
|
|
try: |
|
|
self.logger.log_info("๐ Initializing VeRL trainer for AZR training") |
|
|
|
|
|
|
|
|
if not self.ray_initialized: |
|
|
self.logger.log_info("๐ Initializing Ray cluster for first time") |
|
|
self._initialize_ray_cluster() |
|
|
else: |
|
|
self.logger.log_info("โป๏ธ Using existing Ray cluster") |
|
|
|
|
|
|
|
|
if not hasattr(self, 'verl_config') or self.verl_config is None: |
|
|
self._load_verl_config() |
|
|
|
|
|
|
|
|
train_files = [ |
|
|
os.path.join(training_data_path, "induction.parquet"), |
|
|
os.path.join(training_data_path, "deduction.parquet"), |
|
|
os.path.join(training_data_path, "abduction.parquet") |
|
|
] |
|
|
|
|
|
valid_train_files = [f for f in train_files if os.path.exists(f)] |
|
|
self.verl_config.data.train_files = valid_train_files |
|
|
self.verl_config.data.val_files = valid_train_files |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.verl_config.actor_rollout_ref.model.path = self.original_model_name |
|
|
self.logger.log_info(f"๐ง VeRL config set to original model path: {self.original_model_name}") |
|
|
|
|
|
|
|
|
inference_engine = getattr(self.verl_config.data, 'ttrlvr_inference_engine', 'vllm') |
|
|
self.logger.log_info(f"๐ง TTRLVR inference engine: {inference_engine}") |
|
|
|
|
|
self.logger.log_info(f"๐ VeRL config loaded successfully") |
|
|
self.logger.log_info(f"๐ Training data files: {len(self.verl_config.data.train_files)}") |
|
|
|
|
|
|
|
|
from transformers import AutoTokenizer |
|
|
from verl.trainer.ppo.ray_trainer import ResourcePoolManager, Role |
|
|
|
|
|
|
|
|
import ray |
|
|
|
|
|
|
|
|
import logging |
|
|
|
|
|
|
|
|
verl_loggers = [ |
|
|
"verl", |
|
|
"verl.workers", |
|
|
"verl.trainer", |
|
|
"verl.workers.fsdp_workers", |
|
|
"verl.workers.sharding_manager", |
|
|
"absolute_zero_reasoner.trainer.ppo" |
|
|
] |
|
|
|
|
|
|
|
|
if hasattr(self.logger, 'log_file_path') and self.logger.log_file_path: |
|
|
file_handler = logging.FileHandler(self.logger.log_file_path) |
|
|
file_handler.setFormatter(logging.Formatter('[VeRL] %(asctime)s - %(name)s - %(levelname)s - %(message)s')) |
|
|
|
|
|
for logger_name in verl_loggers: |
|
|
verl_logger = logging.getLogger(logger_name) |
|
|
verl_logger.setLevel(logging.INFO) |
|
|
verl_logger.addHandler(file_handler) |
|
|
|
|
|
|
|
|
strategy = self.verl_config.actor_rollout_ref.actor.strategy |
|
|
self.logger.log_info(f"๐ง Actor strategy: {strategy}") |
|
|
|
|
|
if strategy in ["fsdp", "fsdp2"]: |
|
|
from verl.single_controller.ray import RayWorkerGroup |
|
|
from verl.workers.fsdp_workers import ActorRolloutRefWorker, AsyncActorRolloutRefWorker, CriticWorker |
|
|
|
|
|
|
|
|
actor_rollout_cls = AsyncActorRolloutRefWorker if self.verl_config.actor_rollout_ref.rollout.mode == "async" else ActorRolloutRefWorker |
|
|
|
|
|
ray_worker_group_cls = RayWorkerGroup |
|
|
elif strategy == "none": |
|
|
|
|
|
self.logger.log_info("๐ง Using single GPU configuration (FSDP workers without FSDP)") |
|
|
from verl.single_controller.ray import RayWorkerGroup |
|
|
from verl.workers.fsdp_workers import ActorRolloutRefWorker, AsyncActorRolloutRefWorker, CriticWorker |
|
|
|
|
|
|
|
|
actor_rollout_cls = AsyncActorRolloutRefWorker if self.verl_config.actor_rollout_ref.rollout.mode == "async" else ActorRolloutRefWorker |
|
|
ray_worker_group_cls = RayWorkerGroup |
|
|
else: |
|
|
raise NotImplementedError(f"Strategy '{strategy}' not supported. Supported: fsdp, fsdp2, none") |
|
|
|
|
|
|
|
|
if self.current_model_path.startswith('memory://'): |
|
|
|
|
|
tokenizer_path = self.original_model_name |
|
|
self.logger.log_info(f"๐ง Using original model path for tokenizer: {self.original_model_name}") |
|
|
else: |
|
|
tokenizer_path = self.current_model_path |
|
|
|
|
|
tokenizer = AutoTokenizer.from_pretrained(tokenizer_path) |
|
|
if tokenizer.pad_token is None: |
|
|
tokenizer.pad_token = tokenizer.eos_token |
|
|
|
|
|
|
|
|
resource_pool_spec = { |
|
|
"actor_rollout": [0], |
|
|
"critic": [0], |
|
|
"ref": [0], |
|
|
"reward": [0] |
|
|
} |
|
|
|
|
|
|
|
|
role_worker_mapping = { |
|
|
Role.ActorRollout: ray.remote(actor_rollout_cls), |
|
|
Role.Critic: ray.remote(CriticWorker), |
|
|
} |
|
|
|
|
|
|
|
|
global_pool_id = "global_pool" |
|
|
resource_pool_spec = { |
|
|
global_pool_id: [self.verl_config.trainer.n_gpus_per_node] * self.verl_config.trainer.nnodes, |
|
|
} |
|
|
mapping = { |
|
|
Role.ActorRollout: global_pool_id, |
|
|
Role.Critic: global_pool_id, |
|
|
} |
|
|
|
|
|
resource_pool_manager = ResourcePoolManager( |
|
|
resource_pool_spec=resource_pool_spec, |
|
|
mapping=mapping |
|
|
) |
|
|
|
|
|
|
|
|
self.logger.log_info("๐ข Pre-calculating total_training_steps before trainer creation") |
|
|
|
|
|
|
|
|
import pandas as pd |
|
|
task_files = ['induction.parquet', 'deduction.parquet', 'abduction.parquet'] |
|
|
task_dataloader_sizes = [] |
|
|
|
|
|
for task_file in task_files: |
|
|
file_path = os.path.join(training_data_path, task_file) |
|
|
if os.path.exists(file_path): |
|
|
df = pd.read_parquet(file_path) |
|
|
train_batch_size = self.verl_config.data.train_batch_size |
|
|
task_dataloader_size = (len(df) + train_batch_size - 1) // train_batch_size |
|
|
task_dataloader_sizes.append(task_dataloader_size) |
|
|
self.logger.log_info(f" ๐ {task_file}: {len(df)} samples โ {task_dataloader_size} batches") |
|
|
|
|
|
|
|
|
if task_dataloader_sizes: |
|
|
estimated_dataloader_size = min(task_dataloader_sizes) |
|
|
estimated_total_training_steps = estimated_dataloader_size * self.verl_config.trainer.total_epochs |
|
|
self.logger.log_info(f" ๐ข Min dataloader size: {estimated_dataloader_size}, Total steps: {estimated_total_training_steps}") |
|
|
else: |
|
|
estimated_total_training_steps = 100 |
|
|
|
|
|
self.logger.log_info(f"๐ Pre-calculated training steps:") |
|
|
self.logger.log_info(f" - Task dataloader sizes: {task_dataloader_sizes}") |
|
|
self.logger.log_info(f" - Min dataloader size: {estimated_dataloader_size}") |
|
|
self.logger.log_info(f" - Total epochs: {self.verl_config.trainer.total_epochs}") |
|
|
self.logger.log_info(f" - Estimated total_training_steps: {estimated_total_training_steps}") |
|
|
|
|
|
|
|
|
from omegaconf import OmegaConf, open_dict |
|
|
OmegaConf.set_struct(self.verl_config, True) |
|
|
with open_dict(self.verl_config): |
|
|
|
|
|
self.verl_config.actor_rollout_ref.actor.optim.total_training_steps = estimated_total_training_steps |
|
|
|
|
|
self.verl_config.trainer.total_training_steps = estimated_total_training_steps |
|
|
|
|
|
if hasattr(self.verl_config, 'critic') and self.verl_config.critic.get('include_critic', False): |
|
|
self.verl_config.critic.optim.total_training_steps = estimated_total_training_steps |
|
|
|
|
|
self.logger.log_info(f"โ
Injected total_training_steps={estimated_total_training_steps} into config before trainer creation") |
|
|
|
|
|
|
|
|
actor_value = OmegaConf.select(self.verl_config, "actor_rollout_ref.actor.optim.total_training_steps") |
|
|
trainer_value = OmegaConf.select(self.verl_config, "trainer.total_training_steps") |
|
|
self.logger.log_info(f"๐ Verification: actor.optim.total_training_steps = {actor_value}") |
|
|
self.logger.log_info(f"๐ Verification: trainer.total_training_steps = {trainer_value}") |
|
|
|
|
|
|
|
|
self.logger.log_info("๐ Creating new VLLM for VeRL (AZR pattern)") |
|
|
|
|
|
|
|
|
self.verl_trainer = CodeIORayPPOTrainer( |
|
|
config=self.verl_config, |
|
|
tokenizer=tokenizer, |
|
|
role_worker_mapping=role_worker_mapping, |
|
|
resource_pool_manager=resource_pool_manager, |
|
|
ray_worker_group_cls=ray_worker_group_cls |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
self.logger.log_info("๐ง Initializing VeRL workers...") |
|
|
self.verl_trainer.init_workers() |
|
|
self.logger.log_info("โ
VeRL workers initialized") |
|
|
|
|
|
|
|
|
self.logger.log_info(f"๐ Verifying dataloader after trainer creation:") |
|
|
if hasattr(self.verl_trainer, 'train_dataloader'): |
|
|
actual_dataloader_size = len(self.verl_trainer.train_dataloader) if self.verl_trainer.train_dataloader else 0 |
|
|
self.logger.log_info(f" - Actual dataloader size: {actual_dataloader_size}") |
|
|
self.logger.log_info(f" - Estimated dataloader size: {estimated_dataloader_size}") |
|
|
|
|
|
if actual_dataloader_size != estimated_dataloader_size: |
|
|
self.logger.log_warning(f"โ ๏ธ Dataloader size mismatch! Estimated: {estimated_dataloader_size}, Actual: {actual_dataloader_size}") |
|
|
else: |
|
|
self.logger.log_info("โ
Dataloader size estimation was correct") |
|
|
else: |
|
|
self.logger.log_warning("โ ๏ธ No train_dataloader found after trainer creation") |
|
|
|
|
|
|
|
|
|
|
|
self._replace_verl_model_with_existing_instance() |
|
|
|
|
|
self.logger.log_info("โ
VeRL trainer initialized successfully") |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.log_error(f"Failed to initialize VeRL trainer: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
self.verl_trainer = None |
|
|
|
|
|
|
|
|
def _replace_verl_model_with_existing_instance(self): |
|
|
"""VeRL trainer์ ๋ชจ๋ธ์ ๊ธฐ์กด ์ธ์คํด์ค๋ก ๊ต์ฒดํ์ฌ ๋ฉ๋ชจ๋ฆฌ ์ ์ฝ""" |
|
|
|
|
|
try: |
|
|
self.logger.log_info("๐ Replacing VeRL models with existing instance for memory efficiency") |
|
|
|
|
|
|
|
|
if hasattr(self.verl_trainer, 'actor_rollout_ref'): |
|
|
if hasattr(self.verl_trainer.actor_rollout_ref, 'actor'): |
|
|
if hasattr(self.verl_trainer.actor_rollout_ref.actor, 'model'): |
|
|
|
|
|
del self.verl_trainer.actor_rollout_ref.actor.model |
|
|
|
|
|
|
|
|
self.verl_trainer.actor_rollout_ref.actor.model = self.current_model |
|
|
self.logger.log_info("โ
Actor model replaced with existing instance") |
|
|
|
|
|
|
|
|
if hasattr(self.verl_trainer.actor_rollout_ref, 'rollout'): |
|
|
if hasattr(self.verl_trainer.actor_rollout_ref.rollout, 'llm'): |
|
|
|
|
|
self.logger.log_info("๐ง Rollout engine detected - using existing model weights") |
|
|
|
|
|
|
|
|
import torch |
|
|
if torch.cuda.is_available(): |
|
|
torch.cuda.empty_cache() |
|
|
current_memory = torch.cuda.memory_allocated() / 1024**3 |
|
|
self.logger.log_info(f"๐ GPU memory after model replacement: {current_memory:.1f}GB") |
|
|
|
|
|
self.logger.log_info("๐ฏ Single model instance now used across all steps (1-5)!") |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.log_warning(f"Model replacement failed, using default VeRL behavior: {e}") |
|
|
|
|
|
|
|
|
def _update_verl_trainer_data(self, training_data_path: str): |
|
|
"""๊ธฐ์กด VeRL trainer์์ ๋ฐ์ดํฐ ๊ฒฝ๋ก๋ง ์
๋ฐ์ดํธ""" |
|
|
|
|
|
try: |
|
|
self.logger.log_info("๐ Updating VeRL trainer data for new round") |
|
|
|
|
|
|
|
|
new_train_files = [ |
|
|
os.path.join(training_data_path, "induction.parquet"), |
|
|
os.path.join(training_data_path, "deduction.parquet"), |
|
|
os.path.join(training_data_path, "abduction.parquet") |
|
|
] |
|
|
|
|
|
|
|
|
valid_files = [f for f in new_train_files if os.path.exists(f)] |
|
|
|
|
|
if not valid_files: |
|
|
self.logger.log_warning("โ ๏ธ No valid training files found for update") |
|
|
return |
|
|
|
|
|
|
|
|
self.verl_config.data.train_files = valid_files |
|
|
self.verl_config.data.val_files = valid_files |
|
|
|
|
|
|
|
|
if hasattr(self.verl_trainer, 'update_data_files'): |
|
|
self.verl_trainer.update_data_files(valid_files) |
|
|
else: |
|
|
|
|
|
self.verl_trainer.config.data.train_files = valid_files |
|
|
self.verl_trainer.config.data.val_files = valid_files |
|
|
|
|
|
self.logger.log_info(f"โ
Updated training data: {len(valid_files)} files") |
|
|
|
|
|
|
|
|
self.logger.log_info("๐ง Re-initializing VeRL workers with new data...") |
|
|
self.verl_trainer.init_workers() |
|
|
self.logger.log_info("โ
VeRL workers re-initialized with actual training data") |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.log_error(f"Failed to update VeRL trainer data: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
|
|
|
def _load_verl_config(self): |
|
|
"""VeRL config ๋ก๋ - ๊ธฐ์กด YAML ํ์ผ ์ฌ์ฉ""" |
|
|
|
|
|
try: |
|
|
|
|
|
if self.verl_config_path: |
|
|
config_path = self.verl_config_path |
|
|
else: |
|
|
config_path = self._get_default_config_path() |
|
|
self.logger.log_info(f"๐ง Loading VeRL config from: {config_path}") |
|
|
self.logger.log_info(f"๐ง Config selected for {self.execution_mode} mode") |
|
|
|
|
|
from omegaconf import OmegaConf |
|
|
import os |
|
|
|
|
|
if not os.path.exists(config_path): |
|
|
raise FileNotFoundError(f"Config file not found: {config_path}") |
|
|
|
|
|
|
|
|
self.verl_config = OmegaConf.load(config_path) |
|
|
|
|
|
|
|
|
if hasattr(self, 'current_model_path') and self.current_model_path: |
|
|
if self.current_model_path.startswith('memory://'): |
|
|
|
|
|
model_path_for_verl = self.original_model_name |
|
|
self.logger.log_info(f"๐ง Using original model path for VeRL: {model_path_for_verl}") |
|
|
else: |
|
|
model_path_for_verl = self.current_model_path |
|
|
|
|
|
self.verl_config.actor_rollout_ref.model.path = model_path_for_verl |
|
|
self.logger.log_info(f"๐ง Updated VeRL model path to: {model_path_for_verl}") |
|
|
|
|
|
self.logger.log_info("โ
VeRL config loaded successfully from YAML") |
|
|
self.logger.log_info(f" - TTRLVR Ray parallel processing: {self.verl_config.data.ttrlvr_ray_config.parallel_processing}") |
|
|
self.logger.log_info(f" - TTRLVR inference engine: {self.verl_config.data.ttrlvr_inference_engine}") |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.log_error(f"Config loading failed: {e}") |
|
|
self.verl_config = None |
|
|
|
|
|
def _detect_available_gpus(self): |
|
|
"""์ฌ์ฉ ๊ฐ๋ฅํ GPU ๋ฆฌ์คํธ ๊ฐ์ง""" |
|
|
import os |
|
|
cuda_visible_devices = os.environ.get('CUDA_VISIBLE_DEVICES', '0') |
|
|
if cuda_visible_devices: |
|
|
return [int(gpu.strip()) for gpu in cuda_visible_devices.split(',') if gpu.strip()] |
|
|
else: |
|
|
return [0] |
|
|
|
|
|
def _determine_execution_mode(self): |
|
|
"""GPU ๊ฐ์์ ๋ฐ๋ฅธ ์คํ ๋ชจ๋ ๊ฒฐ์ """ |
|
|
num_gpus = len(self.available_gpus) |
|
|
|
|
|
if num_gpus == 1: |
|
|
return "single_gpu" |
|
|
else: |
|
|
return "distributed" |
|
|
|
|
|
def _get_default_config_path(self): |
|
|
"""์คํ ๋ชจ๋์ ๋ฐ๋ฅธ ๊ธฐ๋ณธ config ํ์ผ ๊ฒฝ๋ก ๋ฐํ""" |
|
|
base_path = "/home/ubuntu/RLVR/TestTime-RLVR-v2/test/configs" |
|
|
|
|
|
if self.execution_mode == "single_gpu": |
|
|
return f"{base_path}/ttrlvr_azr_ppo.yaml" |
|
|
else: |
|
|
return f"{base_path}/ttrlvr_azr_ppo_4gpu.yaml" |
|
|
|
|
|
def _should_save_checkpoint(self, round_num: int) -> bool: |
|
|
"""์ฒดํฌํฌ์ธํธ ์ ์ฅ ์ฌ๋ถ ๊ฒฐ์ """ |
|
|
if self.save_every_round: |
|
|
return True |
|
|
|
|
|
if round_num % self.save_round_interval == 0: |
|
|
return True |
|
|
|
|
|
return False |
|
|
|
|
|
def _convert_jsonl_to_ttrlvr_format(self, jsonl_path: str, output_dir: str): |
|
|
"""VeRL์ JSONL ์ถ๋ ฅ์ TTRLVR ํ์์ ๊ฐ๋ณ ํ
์คํธ ํ์ผ๋ก ๋ณํ""" |
|
|
import json |
|
|
|
|
|
try: |
|
|
with open(jsonl_path, 'r') as f: |
|
|
data = json.load(f) |
|
|
|
|
|
|
|
|
for i in range(len(data.get('input', []))): |
|
|
prompt = data['input'][i] if 'input' in data else "" |
|
|
response = data['output'][i] if 'output' in data else "" |
|
|
score = data['score'][i] if 'score' in data else 0.0 |
|
|
|
|
|
|
|
|
task_type = "unknown" |
|
|
if "induction" in prompt.lower() or "input/output pairs" in prompt: |
|
|
task_type = "induction" |
|
|
elif "deduction" in prompt.lower() or "observed output" in prompt: |
|
|
task_type = "deduction" |
|
|
elif "abduction" in prompt.lower() or "which input produces" in prompt: |
|
|
task_type = "abduction" |
|
|
|
|
|
|
|
|
task_dir = os.path.join(output_dir, task_type) |
|
|
os.makedirs(task_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
filename = f"verl_training_{task_type}_{self.response_counter}_response.txt" |
|
|
filepath = os.path.join(task_dir, filename) |
|
|
|
|
|
|
|
|
with open(filepath, 'w') as f: |
|
|
f.write(f"Task Type: {task_type}\n") |
|
|
f.write(f"Task ID: verl_step_{data.get('step', 0)[i]}_{i}\n") |
|
|
f.write(f"Generated: {datetime.now().strftime('%Y%m%d_%H%M%S')}\n") |
|
|
f.write("="*80 + "\n") |
|
|
f.write("ORIGINAL PROMPT:\n") |
|
|
f.write("="*80 + "\n") |
|
|
f.write(prompt + "\n") |
|
|
f.write("="*80 + "\n") |
|
|
f.write("LLM RESPONSE:\n") |
|
|
f.write("="*80 + "\n") |
|
|
f.write(response + "\n") |
|
|
f.write("="*80 + "\n") |
|
|
f.write("REWARD SCORE:\n") |
|
|
f.write("="*80 + "\n") |
|
|
f.write(f"Score: {score:.3f}\n") |
|
|
|
|
|
|
|
|
for key in data.keys(): |
|
|
if key not in ['input', 'output', 'score', 'step'] and isinstance(data[key], list): |
|
|
f.write("="*80 + "\n") |
|
|
f.write(f"{key.upper()}:\n") |
|
|
f.write("="*80 + "\n") |
|
|
f.write(f"{data[key][i] if i < len(data[key]) else 'N/A'}\n") |
|
|
|
|
|
self.response_counter += 1 |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.log_error(f"Failed to convert JSONL to TTRLVR format: {e}") |
|
|
|
|
|
def _save_round_checkpoint(self, round_num: int): |
|
|
"""๋งค ๋ผ์ด๋๋ง๋ค VeRL ์ฒดํฌํฌ์ธํธ ์ ์ฅ""" |
|
|
try: |
|
|
if hasattr(self, 'verl_trainer') and self.verl_trainer: |
|
|
|
|
|
checkpoint_path = f"checkpoint_round_{round_num}" |
|
|
|
|
|
|
|
|
original_dir = self.verl_trainer.config.trainer.default_local_dir |
|
|
round_checkpoint_dir = f"{original_dir}/{checkpoint_path}" |
|
|
|
|
|
|
|
|
self.verl_trainer.config.trainer.default_local_dir = round_checkpoint_dir |
|
|
|
|
|
|
|
|
self.verl_trainer._save_checkpoint() |
|
|
|
|
|
|
|
|
self.verl_trainer.config.trainer.default_local_dir = original_dir |
|
|
|
|
|
self.logger.log_info(f"๐พ Round {round_num} checkpoint saved to: {round_checkpoint_dir}") |
|
|
|
|
|
return round_checkpoint_dir |
|
|
else: |
|
|
self.logger.log_warning("โ ๏ธ VeRL trainer not available for checkpoint saving") |
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.log_error(f"Failed to save round {round_num} checkpoint: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return None |
|
|
|
|
|
def _initialize_ray_cluster(self): |
|
|
"""Ray ํด๋ฌ์คํฐ ์ด๊ธฐํ (์ ์ฒด ์ธ์
์์ ํ ๋ฒ๋ง)""" |
|
|
|
|
|
try: |
|
|
import ray |
|
|
import os |
|
|
|
|
|
|
|
|
if ray.is_initialized(): |
|
|
self.logger.log_info("โ ๏ธ Ray already initialized, using existing cluster") |
|
|
self.ray_initialized = True |
|
|
return |
|
|
|
|
|
self.logger.log_info("๐ Initializing Ray cluster with all GPUs for shared usage") |
|
|
|
|
|
|
|
|
cuda_visible_devices = os.environ.get("CUDA_VISIBLE_DEVICES", "0") |
|
|
self.logger.log_info(f"๐ฏ Ray initialization with CUDA_VISIBLE_DEVICES: {cuda_visible_devices}") |
|
|
|
|
|
|
|
|
available_gpus = cuda_visible_devices.split(',') if cuda_visible_devices else ['0'] |
|
|
self.logger.log_info(f"๐ฏ Available GPUs: {available_gpus} (count: {len(available_gpus)})") |
|
|
|
|
|
|
|
|
ray_config = getattr(self.verl_config, 'ray_init', None) if hasattr(self, 'verl_config') and self.verl_config else None |
|
|
|
|
|
|
|
|
|
|
|
ray.init( |
|
|
runtime_env={"env_vars": { |
|
|
"TOKENIZERS_PARALLELISM": "true", |
|
|
"NCCL_DEBUG": "WARN", |
|
|
"VLLM_LOGGING_LEVEL": "WARN", |
|
|
"VERL_LOGGING_LEVEL": "INFO", |
|
|
"VLLM_ALLOW_RUNTIME_LORA_UPDATING": "true", |
|
|
"CUDA_VISIBLE_DEVICES": cuda_visible_devices |
|
|
}}, |
|
|
num_cpus=ray_config.num_cpus if ray_config else 16, |
|
|
|
|
|
ignore_reinit_error=True |
|
|
) |
|
|
|
|
|
self.ray_initialized = True |
|
|
self.logger.log_info("โ
Ray cluster initialized successfully") |
|
|
self.logger.log_info(f" - GPUs available via CUDA: {cuda_visible_devices}") |
|
|
self.logger.log_info(f" - CPUs: {ray_config.num_cpus if ray_config else 16}") |
|
|
self.logger.log_info(" - GPU management: CUDA_VISIBLE_DEVICES (not Ray)") |
|
|
self.logger.log_info(" - GPU sharing enabled: VLLM (GPU 0,1) + FSDP (GPU 0,1,2,3)") |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.log_error(f"Failed to initialize Ray cluster: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
raise |
|
|
|
|
|
def _process_problems_sequential(self, benchmark_config: BenchmarkConfig, |
|
|
problem_ids: List[str], round_num: int) -> Dict[str, Dict]: |
|
|
"""์์ฐจ์ ์ผ๋ก ๋ฌธ์ ๋ค์ ์ฒ๋ฆฌ (๊ธฐ์กด ๋ฐฉ์)""" |
|
|
|
|
|
results = {} |
|
|
|
|
|
for i, problem_id in enumerate(problem_ids): |
|
|
self.logger.log_info(f"๐ Processing problem {i+1}/{len(problem_ids)}: {problem_id}") |
|
|
|
|
|
try: |
|
|
|
|
|
if self.remote_pipeline is None: |
|
|
self._initialize_pipeline() |
|
|
|
|
|
|
|
|
pipeline_result = ray.get(self.remote_pipeline.run_complete_pipeline.remote( |
|
|
benchmark_config, problem_id, round_num, self.session_timestamp |
|
|
)) |
|
|
|
|
|
results[problem_id] = pipeline_result |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.log_error(f"๐ฅ Failed to process {problem_id}: {e}") |
|
|
results[problem_id] = { |
|
|
'success': False, |
|
|
'error': str(e) |
|
|
} |
|
|
|
|
|
return results |
|
|
|
|
|
def _process_problems_parallel(self, benchmark_config: BenchmarkConfig, |
|
|
problem_ids: List[str], round_num: int) -> Dict[str, Dict]: |
|
|
"""[DEPRECATED] Ray๋ฅผ ์ฌ์ฉํ ๋ณ๋ ฌ ๋ฌธ์ ์ฒ๋ฆฌ - ํ์ฌ ์ฌ์ฉํ์ง ์์ |
|
|
|
|
|
Note: ๋ฌธ์ ๊ฐ ๋ณ๋ ฌ ์ฒ๋ฆฌ๋ ๋นํ์ฑํ๋จ. ๋จ์ผ ๋ฌธ์ ๋ด VLLM ๋ฐฐ์น ์ฒ๋ฆฌ๋ง ์ฌ์ฉ. |
|
|
""" |
|
|
|
|
|
try: |
|
|
|
|
|
ray_config = getattr(self.verl_config.data, 'ttrlvr_ray_config', {}) if hasattr(self, 'verl_config') and self.verl_config else {} |
|
|
|
|
|
|
|
|
parallel_enabled = ray_config.get('parallel_processing', False) |
|
|
max_concurrent = ray_config.get('max_concurrent_problems', 4) |
|
|
|
|
|
if not parallel_enabled or len(problem_ids) <= 1: |
|
|
self.logger.log_info("๐ Using sequential processing (parallel_processing=False or single problem)") |
|
|
return self._process_problems_sequential(benchmark_config, problem_ids, round_num) |
|
|
|
|
|
|
|
|
self.logger.log_info(f"๐ Using Ray parallel processing for {len(problem_ids)} problems") |
|
|
self.logger.log_info(f" - Max concurrent: {min(max_concurrent, len(problem_ids))}") |
|
|
|
|
|
import ray |
|
|
|
|
|
|
|
|
import os |
|
|
cuda_visible_devices = os.environ.get("CUDA_VISIBLE_DEVICES", "0") |
|
|
available_gpus = cuda_visible_devices.split(',') if cuda_visible_devices else ['0'] |
|
|
|
|
|
@ray.remote(num_gpus=1) |
|
|
class TTRLVRPipelineActor: |
|
|
def __init__(self, config, logger_config, gpu_id=0): |
|
|
|
|
|
import os |
|
|
import torch |
|
|
|
|
|
|
|
|
cuda_devices = os.environ.get('CUDA_VISIBLE_DEVICES', '0') |
|
|
available_gpus = cuda_devices.split(',') if cuda_devices else ['0'] |
|
|
actual_gpu = available_gpus[gpu_id % len(available_gpus)] |
|
|
|
|
|
|
|
|
os.environ['CUDA_VISIBLE_DEVICES'] = actual_gpu |
|
|
|
|
|
|
|
|
if torch.cuda.is_available(): |
|
|
torch.cuda.set_device(0) |
|
|
print(f"๐ฏ Actor initialized on GPU {actual_gpu} (local:0)") |
|
|
|
|
|
|
|
|
from absolute_zero_reasoner.testtime.complete_pipeline import CompleteTestTimePipeline |
|
|
from absolute_zero_reasoner.testtime.logger import TestTimeLogger |
|
|
|
|
|
|
|
|
logger = TestTimeLogger(log_dir=logger_config.get('log_dir', '/tmp')) |
|
|
|
|
|
|
|
|
model, tokenizer = self._load_pipeline_model(config) |
|
|
|
|
|
self.pipeline = CompleteTestTimePipeline( |
|
|
model=model, |
|
|
tokenizer=tokenizer, |
|
|
config=config, |
|
|
logger=logger |
|
|
) |
|
|
|
|
|
def _load_pipeline_model(self, config): |
|
|
"""๊ฐ Actor์์ ๋
๋ฆฝ์ ์ผ๋ก ๋ชจ๋ธ ๋ก๋""" |
|
|
from absolute_zero_reasoner.testtime.solution_generator import InitialSolutionGenerator |
|
|
import torch |
|
|
import os |
|
|
|
|
|
|
|
|
use_vllm = getattr(config, 'use_vllm_for_data_generation', True) |
|
|
|
|
|
|
|
|
device = 'cuda:0' |
|
|
|
|
|
print(f"๐ Loading model on device {device} (actual GPU: {os.environ.get('CUDA_VISIBLE_DEVICES', 'unknown')})") |
|
|
|
|
|
return InitialSolutionGenerator.load_model_with_optimizations( |
|
|
config.model_name, device, config, use_vllm=use_vllm |
|
|
) |
|
|
|
|
|
def process_problem(self, benchmark_config, problem_id, round_num, session_timestamp): |
|
|
"""๋จ์ผ ๋ฌธ์ ์ฒ๋ฆฌ""" |
|
|
try: |
|
|
result = self.pipeline.run_complete_pipeline( |
|
|
benchmark_config, problem_id, round_num, session_timestamp |
|
|
) |
|
|
return problem_id, result |
|
|
except Exception as e: |
|
|
return problem_id, { |
|
|
'success': False, |
|
|
'error': str(e) |
|
|
} |
|
|
|
|
|
|
|
|
num_actors = min(max_concurrent, len(problem_ids), len(available_gpus)) |
|
|
self.logger.log_info(f"๐ญ Creating {num_actors} Ray actors across {len(available_gpus)} GPUs") |
|
|
self.logger.log_info(f" - Available GPUs: {available_gpus}") |
|
|
self.logger.log_info(f" - Debug: max_concurrent={max_concurrent}, len(problem_ids)={len(problem_ids)}, len(available_gpus)={len(available_gpus)}") |
|
|
|
|
|
|
|
|
logger_config = { |
|
|
'log_dir': self.logger.log_dir if hasattr(self.logger, 'log_dir') else '/tmp' |
|
|
} |
|
|
|
|
|
|
|
|
actors = [] |
|
|
for i in range(num_actors): |
|
|
gpu_id = i % len(available_gpus) |
|
|
self.logger.log_info(f" - Actor {i} -> GPU {available_gpus[gpu_id]}") |
|
|
actors.append(TTRLVRPipelineActor.remote(self.config, logger_config, gpu_id)) |
|
|
|
|
|
|
|
|
futures = [] |
|
|
for i, problem_id in enumerate(problem_ids): |
|
|
actor_idx = i % num_actors |
|
|
future = actors[actor_idx].process_problem.remote( |
|
|
benchmark_config, problem_id, round_num, self.session_timestamp |
|
|
) |
|
|
futures.append(future) |
|
|
|
|
|
|
|
|
self.logger.log_info(f"โณ Waiting for {len(futures)} parallel tasks to complete...") |
|
|
results_list = ray.get(futures) |
|
|
|
|
|
|
|
|
results = {} |
|
|
for problem_id, result in results_list: |
|
|
results[problem_id] = result |
|
|
|
|
|
self.logger.log_info(f"โ
Parallel processing completed: {len(results)} problems processed") |
|
|
|
|
|
return results |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.log_error(f"๐ฅ Parallel processing failed: {e}") |
|
|
self.logger.log_info("๐ Falling back to sequential processing") |
|
|
return self._process_problems_sequential(benchmark_config, problem_ids, round_num) |
|
|
|
|
|
def _find_actual_training_data(self) -> Optional[str]: |
|
|
"""์ต๊ทผ ์์ฑ๋ ์ค์ ํ์ต ๋ฐ์ดํฐ ๋๋ ํ ๋ฆฌ ์ฐพ๊ธฐ""" |
|
|
try: |
|
|
|
|
|
base_path = "/home/ubuntu/RLVR/TestTime-RLVR-v2/tmp/batch_results" |
|
|
|
|
|
|
|
|
import glob |
|
|
search_pattern = os.path.join(base_path, "**/azr_training_data") |
|
|
data_dirs = glob.glob(search_pattern, recursive=True) |
|
|
|
|
|
if not data_dirs: |
|
|
return None |
|
|
|
|
|
|
|
|
latest_dir = max(data_dirs, key=os.path.getmtime) |
|
|
|
|
|
|
|
|
task_files = ['induction.parquet', 'deduction.parquet', 'abduction.parquet'] |
|
|
parquet_count = 0 |
|
|
for task_file in task_files: |
|
|
if os.path.exists(os.path.join(latest_dir, task_file)): |
|
|
parquet_count += 1 |
|
|
|
|
|
if parquet_count > 0: |
|
|
return latest_dir |
|
|
else: |
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.log_error(f"Error finding actual training data: {e}") |
|
|
return None |