"""Per-task 답변 캐시. 채점 서버에 답을 한 번 전송하더라도 점수 확인·디버깅을 위해 다시 돌려야 할 때가 있다. 한 문제마다 LLM 호출이 비싸므로 task_id 단위로 답을 디스크에 보관해 재실행 시 캐시된 문제는 스킵한다. 한 문제에서 죽으면 전체가 멈추던 부수 문제도 함께 완화 — 다음 실행 때 그 문제만 다시 풀면 된다. 저장 위치: .cache/answers.json (이미 디렉터리 존재). 형식: {task_id: {"question": str, "answer": str}} 원자적 쓰기: 임시 파일 → os.replace 로 동시쓰기/크래시 안전. 캐시 무효화는 파일 직접 삭제(rm .cache/answers.json) 또는 clear_cache() 사용. """ import json import os import tempfile from pathlib import Path _CACHE_PATH = Path(".cache") / "answers.json" def load_cache() -> dict: """디스크에서 캐시 로드. 파일 없거나 깨지면 빈 dict.""" if not _CACHE_PATH.exists(): return {} try: return json.loads(_CACHE_PATH.read_text(encoding="utf-8")) except Exception as e: print(f"Warning: cache load failed ({e}); starting empty.") return {} def save_answer(task_id: str, question: str, answer: str) -> None: """task_id 답을 캐시에 추가하고 원자적으로 저장. AGENT_ERROR 결과는 캐시 안 함 — 재실행 시 다시 시도해야 하는 항목이라.""" if not task_id or is_retryable_answer(answer): return cache = load_cache() cache[task_id] = {"question": question, "answer": answer} _CACHE_PATH.parent.mkdir(parents=True, exist_ok=True) # 같은 디렉터리에 임시 파일 → os.replace 로 OS-level atomic rename. fd, tmp = tempfile.mkstemp(prefix="answers.", suffix=".tmp", dir=str(_CACHE_PATH.parent)) try: with os.fdopen(fd, "w", encoding="utf-8") as f: json.dump(cache, f, ensure_ascii=False, indent=2) os.replace(tmp, _CACHE_PATH) except Exception: # 실패 시 임시 파일 정리 후 재발생. try: os.unlink(tmp) except OSError: pass raise def get_cached_answer(task_id: str, cache: dict | None = None) -> str | None: """캐시에서 task_id의 답을 꺼내거나 None. cache 인자를 주면 매번 디스크 재로딩 안 함(루프에서 유용).""" if cache is None: cache = load_cache() entry = cache.get(task_id) if entry and isinstance(entry, dict): return entry.get("answer") return None def is_retryable_answer(answer: str | None) -> bool: """다음 실행에서 다시 시도해야 할 답변인지 판별.""" if answer is None: return True a = str(answer).strip() if not a: return True upper = a.upper() return ( upper == "UNKNOWN" or upper == "UNK" or upper.startswith("AGENT_ERROR:") or upper.startswith("AGENT ERROR:") or "CANNOT ANSWER" in upper or "NO FINAL ANSWER" in upper ) def clear_cache() -> None: """수동 호출용. 자동 호출 안 함.""" if _CACHE_PATH.exists(): _CACHE_PATH.unlink() def invalidate_tasks(task_ids) -> int: """주어진 task_id 목록만 캐시에서 제거하고 원자적으로 저장. 개선 단계마다 "오답이었던 task_id만 재시도" 하기 위한 헬퍼. 정답으로 캐시된 항목은 보존해 토큰을 아끼면서, 변경 효과만 깨끗이 측정. Returns: 실제로 제거된 항목 수. """ cache = load_cache() removed = 0 for tid in task_ids: if tid in cache: del cache[tid] removed += 1 if removed == 0: return 0 _CACHE_PATH.parent.mkdir(parents=True, exist_ok=True) fd, tmp = tempfile.mkstemp(prefix="answers.", suffix=".tmp", dir=str(_CACHE_PATH.parent)) try: with os.fdopen(fd, "w", encoding="utf-8") as f: json.dump(cache, f, ensure_ascii=False, indent=2) os.replace(tmp, _CACHE_PATH) except Exception: try: os.unlink(tmp) except OSError: pass raise return removed