eodi-mcp / src /utils /cache.py
lovelymango's picture
Upload 19 files
4c3c97b verified
"""
Eodi ์บ์‹œ ๊ด€๋ฆฌ์ž - ์ฆ๋ถ„ ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•œ ์ฝ˜ํ…์ธ  ๊ธฐ๋ฐ˜ ์บ์‹œ
================================================================
LLM ํ˜ธ์ถœ ๊ฒฐ๊ณผ๋ฅผ ๋กœ์ปฌ์— ์บ์‹ฑํ•˜์—ฌ ๋™์ผ ์ž…๋ ฅ์— ๋Œ€ํ•œ ๋ถˆํ•„์š”ํ•œ ์žฌ์ฒ˜๋ฆฌ๋ฅผ ๋ฐฉ์ง€ํ•ฉ๋‹ˆ๋‹ค.
- Content-addressed cache (SHA256 ํ•ด์‹œ ๊ธฐ๋ฐ˜)
- ์›์ž์  ํŒŒ์ผ ์“ฐ๊ธฐ (tempfile + os.replace)
- ๊ฒ€์ฆ๋œ ๊ฒฐ๊ณผ๋งŒ ์บ์‹œ
- ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ํฌํ•จ ์ €์žฅ
"""
import os
import json
import hashlib
import tempfile
from pathlib import Path
from typing import Optional, Dict, Any
from datetime import datetime, timezone
from src.core.schema import ExtractedKnowledge
# ์บ์‹œ ๋ฒ„์ „ - ์บ์‹œ ๊ตฌ์กฐ๊ฐ€ ๋ณ€๊ฒฝ๋˜๋ฉด ๋ฒ„์ „์„ ์˜ฌ๋ ค ๊ธฐ์กด ์บ์‹œ ๋ฌดํšจํ™”
CACHE_VERSION = "1.0.0"
# ์ถ”์ถœ๊ธฐ ๋ฒ„์ „ - ํ”„๋กฌํ”„ํŠธ/๋กœ์ง ๋ณ€๊ฒฝ ์‹œ ๋ฒ„์ „์„ ์˜ฌ๋ ค ์บ์‹œ ๋ฌดํšจํ™”
EXTRACTOR_VERSION = "1.0.0"
class CacheManager:
"""
์ฆ๋ถ„ ์ฒ˜๋ฆฌ ์บ์‹œ ๊ด€๋ฆฌ์ž.
ํŒŒ์ผ ์ฝ˜ํ…์ธ  ํ•ด์‹œ์™€ ๋ชจ๋ธ ์„ค์ •์„ ๊ธฐ๋ฐ˜์œผ๋กœ ์ถ”์ถœ ๊ฒฐ๊ณผ๋ฅผ ์ €์žฅ/์กฐํšŒํ•ฉ๋‹ˆ๋‹ค.
์บ์‹œ ํ‚ค ๊ตฌ์„ฑ์š”์†Œ:
- ํŒŒ์ผ ๋‚ด์šฉ (normalized)
- ํ”„๋กฌํ”„ํŠธ ํ…œํ”Œ๋ฆฟ ํ•ด์‹œ
- ์Šคํ‚ค๋งˆ ๋ฒ„์ „
- ์ถ”์ถœ๊ธฐ ๋ฒ„์ „
- ๋ชจ๋ธ ํŒŒ๋ผ๋ฏธํ„ฐ (provider, model_name, temperature)
"""
def __init__(self, cache_dir: str = ".eodi_cache"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
def _normalize_content(self, content: str) -> str:
"""์ฝ˜ํ…์ธ  ์ •๊ทœํ™” - ์ค„ ๋ ๋ฌธ์ž, ๊ณต๋ฐฑ ๋“ฑ ํ‘œ์ค€ํ™”"""
# ์ค„ ๋ ๋ฌธ์ž ํ†ต์ผ (CRLF -> LF)
normalized = content.replace('\r\n', '\n').replace('\r', '\n')
# ํŠธ๋ ˆ์ผ๋ง ๊ณต๋ฐฑ ์ œ๊ฑฐ
lines = [line.rstrip() for line in normalized.split('\n')]
return '\n'.join(lines)
def _generate_key(
self,
file_content: str,
prompt_template: str,
model_config: Dict[str, Any]
) -> str:
"""
๊ณ ์œ  ์บ์‹œ ํ‚ค ์ƒ์„ฑ.
Args:
file_content: ์ •๊ทœํ™”๋œ ๋งˆํฌ๋‹ค์šด ์ฝ˜ํ…์ธ 
prompt_template: ์ถ”์ถœ ํ”„๋กฌํ”„ํŠธ ํ…œํ”Œ๋ฆฟ
model_config: ๋ชจ๋ธ ์„ค์ • (provider, model_name, temperature ๋“ฑ)
Returns:
SHA256 ํ•ด์‹œ ๋ฌธ์ž์—ด
"""
# ์ •๊ทœํ™”๋œ ์ฝ˜ํ…์ธ 
normalized_content = self._normalize_content(file_content)
# ๋ชจ๋ธ ์„ค์ •์„ ์ •๋ ฌ๋œ JSON ๋ฌธ์ž์—ด๋กœ ๋ณ€ํ™˜ (์ผ๊ด€๋œ ํ•ด์‹œ๋ฅผ ์œ„ํ•ด)
sorted_config = json.dumps(model_config, sort_keys=True, ensure_ascii=False)
# ํ”„๋กฌํ”„ํŠธ ํ…œํ”Œ๋ฆฟ ํ•ด์‹œ (๋™์  ๋ถ€๋ถ„ ์ œ์™ธํ•œ ํ…œํ”Œ๋ฆฟ ๊ตฌ์กฐ)
prompt_hash = hashlib.sha256(prompt_template.encode('utf-8')).hexdigest()[:16]
# ๋ชจ๋“  ๊ตฌ์„ฑ์š”์†Œ ๊ฒฐํ•ฉ
combined = (
f"v{CACHE_VERSION}|"
f"e{EXTRACTOR_VERSION}|"
f"p{prompt_hash}|"
f"m{sorted_config}|"
f"c{normalized_content}"
)
return hashlib.sha256(combined.encode('utf-8')).hexdigest()
def _create_cache_entry(
self,
file_content: str,
prompt_template: str,
model_config: Dict[str, Any],
result: ExtractedKnowledge,
validation_warnings: Optional[list] = None
) -> Dict[str, Any]:
"""์บ์‹œ ์—”ํŠธ๋ฆฌ ์ƒ์„ฑ - ๊ฒฐ๊ณผ์™€ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ํฌํ•จ"""
return {
"cache_version": CACHE_VERSION,
"extractor_version": EXTRACTOR_VERSION,
"created_at": datetime.now(timezone.utc).isoformat(),
"input_checksum": hashlib.sha256(file_content.encode('utf-8')).hexdigest(),
"prompt_hash": hashlib.sha256(prompt_template.encode('utf-8')).hexdigest()[:16],
"model_config": model_config,
"validation_status": "passed",
"validation_warnings": validation_warnings or [],
"output": result.model_dump(mode='json') # Pydantic v2 JSON ๋ชจ๋“œ - Decimal/datetime ์•ˆ์ „
}
def get_cached_result(
self,
file_content: str,
prompt_template: str,
model_config: Dict[str, Any]
) -> Optional[ExtractedKnowledge]:
"""
์บ์‹œ๋œ ๊ฒฐ๊ณผ ์กฐํšŒ.
Args:
file_content: ์›๋ณธ ๋งˆํฌ๋‹ค์šด ์ฝ˜ํ…์ธ 
prompt_template: ํ”„๋กฌํ”„ํŠธ ํ…œํ”Œ๋ฆฟ
model_config: ๋ชจ๋ธ ์„ค์ •
Returns:
ExtractedKnowledge ๊ฐ์ฒด ๋˜๋Š” None (์บ์‹œ ๋ฏธ์Šค)
"""
key = self._generate_key(file_content, prompt_template, model_config)
cache_file = self.cache_dir / f"{key}.json"
if not cache_file.exists():
return None
try:
with open(cache_file, 'r', encoding='utf-8') as f:
entry = json.load(f)
# ์บ์‹œ ๋ฒ„์ „ ํ™•์ธ - ๋ฒ„์ „ ๋ถˆ์ผ์น˜ ์‹œ ๋ฌดํšจํ™”
if entry.get("cache_version") != CACHE_VERSION:
print(f"โš ๏ธ ์บ์‹œ ๋ฒ„์ „ ๋ถˆ์ผ์น˜, ๋ฌดํšจํ™”: {entry.get('cache_version')} != {CACHE_VERSION}")
cache_file.unlink() # ์˜ค๋ž˜๋œ ์บ์‹œ ์‚ญ์ œ
return None
# ์ถ”์ถœ๊ธฐ ๋ฒ„์ „ ํ™•์ธ
if entry.get("extractor_version") != EXTRACTOR_VERSION:
print(f"โš ๏ธ ์ถ”์ถœ๊ธฐ ๋ฒ„์ „ ๋ถˆ์ผ์น˜, ๋ฌดํšจํ™”: {entry.get('extractor_version')} != {EXTRACTOR_VERSION}")
cache_file.unlink()
return None
# ๊ฒ€์ฆ ์ƒํƒœ ํ™•์ธ - ์‹คํŒจํ•œ ๊ฒฐ๊ณผ๋Š” ์‚ฌ์šฉํ•˜์ง€ ์•Š์Œ
if entry.get("validation_status") != "passed":
print(f"โš ๏ธ ์บ์‹œ๋œ ๊ฒฐ๊ณผ๊ฐ€ ๊ฒ€์ฆ ์‹คํŒจ ์ƒํƒœ, ๋ฌดํšจํ™”")
cache_file.unlink()
return None
# Pydantic ๋ชจ๋ธ๋กœ ๋ณ€ํ™˜
return ExtractedKnowledge.model_validate(entry["output"])
except (json.JSONDecodeError, KeyError, ValueError) as e:
print(f"โš ๏ธ ์บ์‹œ ๋กœ๋“œ ์˜ค๋ฅ˜ ({key[:12]}...): {e}")
# ์†์ƒ๋œ ์บ์‹œ ํŒŒ์ผ ์‚ญ์ œ
try:
cache_file.unlink()
except OSError:
pass
return None
def save_result(
self,
file_content: str,
prompt_template: str,
model_config: Dict[str, Any],
result: ExtractedKnowledge,
validation_warnings: Optional[list] = None
) -> bool:
"""
์ถ”์ถœ ๊ฒฐ๊ณผ๋ฅผ ์บ์‹œ์— ์ €์žฅ (์›์ž์  ์“ฐ๊ธฐ).
Args:
file_content: ์›๋ณธ ๋งˆํฌ๋‹ค์šด ์ฝ˜ํ…์ธ 
prompt_template: ํ”„๋กฌํ”„ํŠธ ํ…œํ”Œ๋ฆฟ
model_config: ๋ชจ๋ธ ์„ค์ •
result: ๊ฒ€์ฆ ์™„๋ฃŒ๋œ ExtractedKnowledge
validation_warnings: ๊ฒ€์ฆ ๊ฒฝ๊ณ  ๋ชฉ๋ก (์„ ํƒ)
Returns:
์ €์žฅ ์„ฑ๊ณต ์—ฌ๋ถ€
"""
key = self._generate_key(file_content, prompt_template, model_config)
cache_file = self.cache_dir / f"{key}.json"
try:
# ์บ์‹œ ์—”ํŠธ๋ฆฌ ์ƒ์„ฑ
entry = self._create_cache_entry(
file_content,
prompt_template,
model_config,
result,
validation_warnings
)
# ์›์ž์  ์“ฐ๊ธฐ: tempfile -> os.replace
# ์ด๋ ‡๊ฒŒ ํ•˜๋ฉด ์ค‘๊ฐ„์— ํ”„๋กœ์„ธ์Šค๊ฐ€ ์ฃฝ์–ด๋„ ์บ์‹œ ํŒŒ์ผ์ด ๊นจ์ง€์ง€ ์•Š์Œ
fd, temp_path = tempfile.mkstemp(
suffix='.json.tmp',
dir=self.cache_dir,
prefix=f'{key[:8]}_'
)
try:
with os.fdopen(fd, 'w', encoding='utf-8') as f:
json.dump(entry, f, ensure_ascii=False, indent=2)
# ์›์ž์  ๊ต์ฒด (POSIX ์‹œ์Šคํ…œ์—์„œ atomic)
os.replace(temp_path, cache_file)
return True
except Exception as e:
# ์ž„์‹œ ํŒŒ์ผ ์ •๋ฆฌ
try:
os.unlink(temp_path)
except OSError:
pass
raise e
except Exception as e:
print(f"โš ๏ธ ์บ์‹œ ์ €์žฅ ์˜ค๋ฅ˜ ({key[:12]}...): {e}")
return False
def get_cache_stats(self) -> Dict[str, Any]:
"""์บ์‹œ ํ†ต๊ณ„ ์กฐํšŒ"""
cache_files = list(self.cache_dir.glob("*.json"))
total_size = sum(f.stat().st_size for f in cache_files)
return {
"cache_dir": str(self.cache_dir),
"cache_version": CACHE_VERSION,
"extractor_version": EXTRACTOR_VERSION,
"file_count": len(cache_files),
"total_size_bytes": total_size,
"total_size_mb": round(total_size / (1024 * 1024), 2)
}
def clear_cache(self) -> int:
"""์ „์ฒด ์บ์‹œ ์‚ญ์ œ, ์‚ญ์ œ๋œ ํŒŒ์ผ ์ˆ˜ ๋ฐ˜ํ™˜"""
cache_files = list(self.cache_dir.glob("*.json"))
count = 0
for f in cache_files:
try:
f.unlink()
count += 1
except OSError:
pass
return count