""" Eodi 캐시 관리자 - 증분 처리를 위한 콘텐츠 기반 캐시 ================================================================ LLM 호출 결과를 로컬에 캐싱하여 동일 입력에 대한 불필요한 재처리를 방지합니다. - Content-addressed cache (SHA256 해시 기반) - 원자적 파일 쓰기 (tempfile + os.replace) - 검증된 결과만 캐시 - 메타데이터 포함 저장 """ import os import json import hashlib import tempfile from pathlib import Path from typing import Optional, Dict, Any from datetime import datetime, timezone from src.core.schema import ExtractedKnowledge # 캐시 버전 - 캐시 구조가 변경되면 버전을 올려 기존 캐시 무효화 CACHE_VERSION = "1.0.0" # 추출기 버전 - 프롬프트/로직 변경 시 버전을 올려 캐시 무효화 EXTRACTOR_VERSION = "1.0.0" class CacheManager: """ 증분 처리 캐시 관리자. 파일 콘텐츠 해시와 모델 설정을 기반으로 추출 결과를 저장/조회합니다. 캐시 키 구성요소: - 파일 내용 (normalized) - 프롬프트 템플릿 해시 - 스키마 버전 - 추출기 버전 - 모델 파라미터 (provider, model_name, temperature) """ def __init__(self, cache_dir: str = ".eodi_cache"): self.cache_dir = Path(cache_dir) self.cache_dir.mkdir(parents=True, exist_ok=True) def _normalize_content(self, content: str) -> str: """콘텐츠 정규화 - 줄 끝 문자, 공백 등 표준화""" # 줄 끝 문자 통일 (CRLF -> LF) normalized = content.replace('\r\n', '\n').replace('\r', '\n') # 트레일링 공백 제거 lines = [line.rstrip() for line in normalized.split('\n')] return '\n'.join(lines) def _generate_key( self, file_content: str, prompt_template: str, model_config: Dict[str, Any] ) -> str: """ 고유 캐시 키 생성. Args: file_content: 정규화된 마크다운 콘텐츠 prompt_template: 추출 프롬프트 템플릿 model_config: 모델 설정 (provider, model_name, temperature 등) Returns: SHA256 해시 문자열 """ # 정규화된 콘텐츠 normalized_content = self._normalize_content(file_content) # 모델 설정을 정렬된 JSON 문자열로 변환 (일관된 해시를 위해) sorted_config = json.dumps(model_config, sort_keys=True, ensure_ascii=False) # 프롬프트 템플릿 해시 (동적 부분 제외한 템플릿 구조) prompt_hash = hashlib.sha256(prompt_template.encode('utf-8')).hexdigest()[:16] # 모든 구성요소 결합 combined = ( f"v{CACHE_VERSION}|" f"e{EXTRACTOR_VERSION}|" f"p{prompt_hash}|" f"m{sorted_config}|" f"c{normalized_content}" ) return hashlib.sha256(combined.encode('utf-8')).hexdigest() def _create_cache_entry( self, file_content: str, prompt_template: str, model_config: Dict[str, Any], result: ExtractedKnowledge, validation_warnings: Optional[list] = None ) -> Dict[str, Any]: """캐시 엔트리 생성 - 결과와 메타데이터 포함""" return { "cache_version": CACHE_VERSION, "extractor_version": EXTRACTOR_VERSION, "created_at": datetime.now(timezone.utc).isoformat(), "input_checksum": hashlib.sha256(file_content.encode('utf-8')).hexdigest(), "prompt_hash": hashlib.sha256(prompt_template.encode('utf-8')).hexdigest()[:16], "model_config": model_config, "validation_status": "passed", "validation_warnings": validation_warnings or [], "output": result.model_dump(mode='json') # Pydantic v2 JSON 모드 - Decimal/datetime 안전 } def get_cached_result( self, file_content: str, prompt_template: str, model_config: Dict[str, Any] ) -> Optional[ExtractedKnowledge]: """ 캐시된 결과 조회. Args: file_content: 원본 마크다운 콘텐츠 prompt_template: 프롬프트 템플릿 model_config: 모델 설정 Returns: ExtractedKnowledge 객체 또는 None (캐시 미스) """ key = self._generate_key(file_content, prompt_template, model_config) cache_file = self.cache_dir / f"{key}.json" if not cache_file.exists(): return None try: with open(cache_file, 'r', encoding='utf-8') as f: entry = json.load(f) # 캐시 버전 확인 - 버전 불일치 시 무효화 if entry.get("cache_version") != CACHE_VERSION: print(f"⚠️ 캐시 버전 불일치, 무효화: {entry.get('cache_version')} != {CACHE_VERSION}") cache_file.unlink() # 오래된 캐시 삭제 return None # 추출기 버전 확인 if entry.get("extractor_version") != EXTRACTOR_VERSION: print(f"⚠️ 추출기 버전 불일치, 무효화: {entry.get('extractor_version')} != {EXTRACTOR_VERSION}") cache_file.unlink() return None # 검증 상태 확인 - 실패한 결과는 사용하지 않음 if entry.get("validation_status") != "passed": print(f"⚠️ 캐시된 결과가 검증 실패 상태, 무효화") cache_file.unlink() return None # Pydantic 모델로 변환 return ExtractedKnowledge.model_validate(entry["output"]) except (json.JSONDecodeError, KeyError, ValueError) as e: print(f"⚠️ 캐시 로드 오류 ({key[:12]}...): {e}") # 손상된 캐시 파일 삭제 try: cache_file.unlink() except OSError: pass return None def save_result( self, file_content: str, prompt_template: str, model_config: Dict[str, Any], result: ExtractedKnowledge, validation_warnings: Optional[list] = None ) -> bool: """ 추출 결과를 캐시에 저장 (원자적 쓰기). Args: file_content: 원본 마크다운 콘텐츠 prompt_template: 프롬프트 템플릿 model_config: 모델 설정 result: 검증 완료된 ExtractedKnowledge validation_warnings: 검증 경고 목록 (선택) Returns: 저장 성공 여부 """ key = self._generate_key(file_content, prompt_template, model_config) cache_file = self.cache_dir / f"{key}.json" try: # 캐시 엔트리 생성 entry = self._create_cache_entry( file_content, prompt_template, model_config, result, validation_warnings ) # 원자적 쓰기: tempfile -> os.replace # 이렇게 하면 중간에 프로세스가 죽어도 캐시 파일이 깨지지 않음 fd, temp_path = tempfile.mkstemp( suffix='.json.tmp', dir=self.cache_dir, prefix=f'{key[:8]}_' ) try: with os.fdopen(fd, 'w', encoding='utf-8') as f: json.dump(entry, f, ensure_ascii=False, indent=2) # 원자적 교체 (POSIX 시스템에서 atomic) os.replace(temp_path, cache_file) return True except Exception as e: # 임시 파일 정리 try: os.unlink(temp_path) except OSError: pass raise e except Exception as e: print(f"⚠️ 캐시 저장 오류 ({key[:12]}...): {e}") return False def get_cache_stats(self) -> Dict[str, Any]: """캐시 통계 조회""" cache_files = list(self.cache_dir.glob("*.json")) total_size = sum(f.stat().st_size for f in cache_files) return { "cache_dir": str(self.cache_dir), "cache_version": CACHE_VERSION, "extractor_version": EXTRACTOR_VERSION, "file_count": len(cache_files), "total_size_bytes": total_size, "total_size_mb": round(total_size / (1024 * 1024), 2) } def clear_cache(self) -> int: """전체 캐시 삭제, 삭제된 파일 수 반환""" cache_files = list(self.cache_dir.glob("*.json")) count = 0 for f in cache_files: try: f.unlink() count += 1 except OSError: pass return count