""" TestTime Logger TestTime RLVR을 위한 포괄적 로깅 시스템 요구사항에 따른 모든 단계별 로그 기록 """ import json import os import time from datetime import datetime from typing import Dict, List, Any, Optional from pathlib import Path import logging class TestTimeLogger: """TestTime RLVR 전용 로거""" def __init__(self, log_dir: str = "logs", log_level: str = "INFO", task_output_dir: str = None, log_file: str = None): # 설계된 구조에 맞는 로그 디렉토리 설정 if task_output_dir: # TTRLVR 통합 모드: 설계된 디렉토리 구조 사용 self.log_dir = Path(task_output_dir) self.use_integrated_structure = True else: # 기존 모드: 기본 logs 디렉토리 사용 self.log_dir = Path(log_dir) self.use_integrated_structure = False self.log_dir.mkdir(parents=True, exist_ok=True) # 디렉토리 구조에 따른 서브 디렉토리 생성 if self.use_integrated_structure: # 설계된 구조: round_N 하위에 세부 디렉토리 (self.log_dir / "current_evaluation").mkdir(exist_ok=True) (self.log_dir / "diverse_programs").mkdir(exist_ok=True) (self.log_dir / "llm_responses").mkdir(exist_ok=True) (self.log_dir / "azr_training_data").mkdir(exist_ok=True) # 기존 구조에서는 서브 디렉토리를 생성하지 않음 (메인 로그 파일만) # 기본 로거 설정 self.logger = logging.getLogger("TestTimeRLVR") self.logger.setLevel(getattr(logging, log_level)) # 핸들러 설정 if not self.logger.handlers: # 파일 핸들러 if log_file: # 특정 로그 파일 경로가 주어진 경우 (Ray worker에서 사용) self.log_file_path = log_file file_handler = logging.FileHandler(log_file, mode='a') # append mode else: # 기본 로그 파일 생성 self.log_file_path = str(self.log_dir / f"testtime_rlvr_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log") file_handler = logging.FileHandler(self.log_file_path) file_handler.setLevel(logging.DEBUG) # 콘솔 핸들러 console_handler = logging.StreamHandler() console_handler.setLevel(getattr(logging, log_level)) # 포맷터 formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) file_handler.setFormatter(formatter) console_handler.setFormatter(formatter) self.logger.addHandler(file_handler) self.logger.addHandler(console_handler) def _get_timestamp(self) -> str: """현재 타임스탬프 반환""" return datetime.now().isoformat() def _save_json_log(self, subdirectory: str, filename: str, data: Dict[str, Any]): """JSON 로그 파일 저장""" if self.use_integrated_structure: # 설계된 구조: 각 카테고리별로 적절한 디렉토리에 저장 if subdirectory == "ipo_extraction": # IPO 추출 로그는 diverse_programs 하위에 별도로 저장 log_path = self.log_dir / "diverse_programs" / f"{filename}.json" elif subdirectory == "task_generation": # Task generation 로그는 round 레벨에 저장 (모든 task 종류 포함) log_path = self.log_dir / f"{filename}.json" elif subdirectory == "problems": log_path = self.log_dir / "current_evaluation" / f"{filename}.json" elif subdirectory == "performance": log_path = self.log_dir / "current_evaluation" / f"{filename}.json" elif subdirectory == "training": log_path = self.log_dir / "azr_training_data" / f"{filename}.json" else: # 기본값 log_path = self.log_dir / subdirectory / f"{filename}.json" else: # 기존 구조 log_path = self.log_dir / subdirectory / f"{filename}.json" # 디렉토리 생성 (없다면) log_path.parent.mkdir(parents=True, exist_ok=True) # 기존 로그 로드 (있다면) if log_path.exists(): with open(log_path, 'r', encoding='utf-8') as f: existing_logs = json.load(f) else: existing_logs = [] # 새 로그 추가 data['timestamp'] = self._get_timestamp() existing_logs.append(data) # 저장 with open(log_path, 'w', encoding='utf-8') as f: json.dump(existing_logs, f, indent=2, ensure_ascii=False) # ============================================================================ # 1. 벤치마크 문제 로깅 (요구사항 1) # ============================================================================ def log_problem_attempt(self, problem: Dict[str, Any], solution: str, is_correct: bool, validation_result: Optional[Dict] = None): """벤치마크 문제와 LLM 답변, 정답 여부 로그""" log_data = { 'problem_id': problem.get('task_id', 'unknown'), 'benchmark': problem.get('benchmark_name', 'unknown'), 'problem_prompt': problem.get('prompt', ''), 'canonical_solution': problem.get('canonical_solution', ''), 'llm_solution': solution, 'is_correct': is_correct, 'validation_result': validation_result or {} } self._save_json_log("problems", f"problem_{problem.get('task_id', 'unknown').replace('/', '_')}", log_data) status = "✅ CORRECT" if is_correct else "❌ INCORRECT" self.logger.info(f"Problem {problem.get('task_id', 'unknown')}: {status}") def log_problem_loaded(self, problem_id: str, benchmark_name: str, method: str = "Original"): """문제 로딩 로그 (EvalPlus/Original 방식 구분)""" self.logger.info(f"Loaded problem {problem_id} from {benchmark_name} ({method} method)") # ============================================================================ # 2. IPO 추출 로깅 (요구사항 2) # ============================================================================ def log_ipo_extraction(self, problem_id: str, extracted_triples: List[Dict], validation_results: List[bool]): """생성된 (i,p,o) 트리플과 검증 결과 로그""" log_data = { 'problem_id': problem_id, 'num_triples': len(extracted_triples), 'triples': extracted_triples, 'validation_results': validation_results, 'valid_triples': sum(validation_results), 'invalid_triples': len(validation_results) - sum(validation_results) } self._save_json_log("ipo_extraction", f"ipo_{problem_id.replace('/', '_')}", log_data) self.logger.info(f"IPO Extraction for {problem_id}: {len(extracted_triples)} triples, " f"{sum(validation_results)} valid") # ============================================================================ # 3. 태스크 생성 로깅 (요구사항 2) # ============================================================================ def log_task_generation(self, problem_id: str, induction_tasks: List[Dict], deduction_tasks: List[Dict], abduction_tasks: List[Dict]): """생성된 induction, deduction, abduction 문제 로그""" log_data = { 'problem_id': problem_id, 'induction_tasks': { 'count': len(induction_tasks), 'tasks': induction_tasks }, 'deduction_tasks': { 'count': len(deduction_tasks), 'tasks': deduction_tasks }, 'abduction_tasks': { 'count': len(abduction_tasks), 'tasks': abduction_tasks }, 'total_tasks': len(induction_tasks) + len(deduction_tasks) + len(abduction_tasks) } self._save_json_log("task_generation", f"tasks_{problem_id.replace('/', '_')}", log_data) total_tasks = log_data['total_tasks'] self.logger.info(f"Task Generation for {problem_id}: {total_tasks} tasks " f"(I:{len(induction_tasks)}, D:{len(deduction_tasks)}, A:{len(abduction_tasks)})") # ============================================================================ # 4. 학습 메트릭 로깅 (요구사항 3, 4) # ============================================================================ def log_task_accuracy(self, problem_id: str, task_type: str, accuracy: float, rewards: List[float], step: int): """induction/deduction/abduction 태스크 정확도와 reward 로그""" log_data = { 'problem_id': problem_id, 'task_type': task_type, # 'induction', 'deduction', 'abduction' 'step': step, 'accuracy': accuracy, 'rewards': rewards, 'avg_reward': sum(rewards) / len(rewards) if rewards else 0.0, 'max_reward': max(rewards) if rewards else 0.0, 'min_reward': min(rewards) if rewards else 0.0 } self._save_json_log("training", f"accuracy_{problem_id.replace('/', '_')}", log_data) self.logger.info(f"Step {step} - {task_type.capitalize()} accuracy: {accuracy:.4f}, " f"avg reward: {log_data['avg_reward']:.4f}") def log_verl_training(self, problem_id: str, step: int, loss: float, learning_rate: float, metrics: Dict[str, Any]): """VeRL 학습 진행 상황 로그""" log_data = { 'problem_id': problem_id, 'step': step, 'loss': loss, 'learning_rate': learning_rate, 'metrics': metrics } self._save_json_log("training", f"verl_{problem_id.replace('/', '_')}", log_data) self.logger.info(f"VeRL Training Step {step}: loss={loss:.6f}, lr={learning_rate:.2e}") # ============================================================================ # 5. 성능 변화 로깅 # ============================================================================ def log_performance_change(self, problem_id: str, cycle: int, before_accuracy: float, after_accuracy: float, improvement: float): """매 사이클별 성능 변화 로그""" log_data = { 'problem_id': problem_id, 'cycle': cycle, 'before_accuracy': before_accuracy, 'after_accuracy': after_accuracy, 'improvement': improvement, 'improvement_percentage': improvement * 100 } self._save_json_log("performance", f"cycle_{problem_id.replace('/', '_')}", log_data) direction = "↗️" if improvement > 0 else "↘️" if improvement < 0 else "→" self.logger.info(f"Cycle {cycle} Performance: {before_accuracy:.4f} → {after_accuracy:.4f} " f"({direction} {improvement:+.4f})") # ============================================================================ # 일반 로깅 # ============================================================================ def log_info(self, message: str): """일반 정보 로그""" self.logger.info(message) def log_error(self, message: str): """에러 로그""" self.logger.error(message) def log_warning(self, message: str): """경고 로그""" self.logger.warning(message) def log_debug(self, message: str): """디버그 로그""" self.logger.debug(message) def get_log_summary(self) -> Dict[str, Any]: """로그 요약 정보 반환""" summary = { 'log_directory': str(self.log_dir), 'subdirectories': { 'problems': len(list((self.log_dir / "problems").glob("*.json"))), 'ipo_extraction': len(list((self.log_dir / "ipo_extraction").glob("*.json"))), 'task_generation': len(list((self.log_dir / "task_generation").glob("*.json"))), 'training': len(list((self.log_dir / "training").glob("*.json"))), 'performance': len(list((self.log_dir / "performance").glob("*.json"))) } } return summary